Block deletion with ref count & repostore refactor (#631)

This commit is contained in:
Tomasz Bekas 2024-06-21 00:46:06 +02:00 committed by GitHub
parent 1a57341b7d
commit ec7faa21b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1017 additions and 864 deletions

View File

@ -258,7 +258,7 @@ proc new*(
repoDs = repoData, repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"), .expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota.uint, quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl) blockTtl = config.blockTtl)
maintenance = BlockMaintainer.new( maintenance = BlockMaintainer.new(

View File

@ -14,6 +14,8 @@ push: {.upraises: [].}
import pkg/libp2p import pkg/libp2p
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/byteutils
import pkg/serde/json
import ../../units import ../../units
import ../../errors import ../../errors
@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof =
nodes.add node nodes.add node
CodexProof.init(mcodec, index.int, nleaves.int, nodes) CodexProof.init(mcodec, index.int, nleaves.int, nodes)
proc fromJson*(
_: type CodexProof,
json: JsonNode
): ?!CodexProof =
expectJsonKind(Cid, JString, json)
var bytes: seq[byte]
try:
bytes = hexToSeqByte(json.str)
except ValueError as err:
return failure(err)
CodexProof.decode(bytes)
func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode())

View File

@ -7,6 +7,7 @@ import ../sales
import ../purchasing import ../purchasing
import ../utils/json import ../utils/json
import ../manifest import ../manifest
import ../units
export json export json
@ -65,10 +66,10 @@ type
id*: NodeId id*: NodeId
RestRepoStore* = object RestRepoStore* = object
totalBlocks* {.serialize.}: uint totalBlocks* {.serialize.}: Natural
quotaMaxBytes* {.serialize.}: uint quotaMaxBytes* {.serialize.}: NBytes
quotaUsedBytes* {.serialize.}: uint quotaUsedBytes* {.serialize.}: NBytes
quotaReservedBytes* {.serialize.}: uint quotaReservedBytes* {.serialize.}: NBytes
proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList = proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList =
RestContentList( RestContentList(

View File

@ -46,6 +46,7 @@ import ../stores
import ../market import ../market
import ../contracts/requests import ../contracts/requests
import ../utils/json import ../utils/json
import ../units
export requests export requests
export logutils export logutils
@ -178,16 +179,16 @@ func key*(availability: Availability): ?!Key =
func key*(reservation: Reservation): ?!Key = func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId) return key(reservation.id, reservation.availabilityId)
func available*(self: Reservations): uint = self.repo.available func available*(self: Reservations): uint = self.repo.available.uint
func hasAvailable*(self: Reservations, bytes: uint): bool = func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes) self.repo.available(bytes.NBytes)
proc exists*( proc exists*(
self: Reservations, self: Reservations,
key: Key): Future[bool] {.async.} = key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key) let exists = await self.repo.metaDs.ds.contains(key)
return exists return exists
proc getImpl( proc getImpl(
@ -198,7 +199,7 @@ proc getImpl(
let err = newException(NotExistsError, "object with key " & $key & " does not exist") let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err) return failure(err)
without serialized =? await self.repo.metaDs.get(key), error: without serialized =? await self.repo.metaDs.ds.get(key), error:
return failure(error.toErr(GetFailedError)) return failure(error.toErr(GetFailedError))
return success serialized return success serialized
@ -225,7 +226,7 @@ proc updateImpl(
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
if err =? (await self.repo.metaDs.put( if err =? (await self.repo.metaDs.ds.put(
key, key,
@(obj.toJson.toBytes) @(obj.toJson.toBytes)
)).errorOption: )).errorOption:
@ -268,11 +269,11 @@ proc updateAvailability(
if oldAvailability.totalSize != obj.totalSize: if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation" trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError)) return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj) let res = await self.updateImpl(obj)
@ -315,7 +316,7 @@ proc delete(
if not await self.exists(key): if not await self.exists(key):
return success() return success()
if err =? (await self.repo.metaDs.delete(key)).errorOption: if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError)) return failure(err.toErr(DeleteFailedError))
return success() return success()
@ -355,7 +356,7 @@ proc deleteReservation*(
if updateErr =? (await self.updateAvailability(availability)).errorOption: if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr) return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption: if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError)) return failure(err.toErr(DeleteFailedError))
return success() return success()
@ -377,14 +378,14 @@ proc createAvailability*(
) )
let bytes = availability.freeSize.truncate(uint) let bytes = availability.freeSize.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption: if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption: if updateErr =? (await self.update(availability)).errorOption:
# rollback the reserve # rollback the reserve
trace "rolling back reserve" trace "rolling back reserve"
if rollbackErr =? (await self.repo.release(bytes)).errorOption: if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption:
rollbackErr.parent = updateErr rollbackErr.parent = updateErr
return failure(rollbackErr) return failure(rollbackErr)
@ -473,7 +474,7 @@ proc returnBytesToAvailability*(
# First lets see if we can re-reserve the bytes, if the Repo's quota # First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm. # is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error: without availabilityKey =? availabilityId.key, error:
@ -488,7 +489,7 @@ proc returnBytesToAvailability*(
if updateErr =? (await self.updateAvailability(availability)).errorOption: if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes" trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
rollbackErr.parent = updateErr rollbackErr.parent = updateErr
return failure(rollbackErr) return failure(rollbackErr)
@ -522,7 +523,7 @@ proc release*(
"trying to release an amount of bytes that is greater than the total size of the Reservation") "trying to release an amount of bytes that is greater than the total size of the Reservation")
return failure(error) return failure(error)
if releaseErr =? (await self.repo.release(bytes)).errorOption: if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError)) return failure(releaseErr.toErr(ReleaseFailedError))
reservation.size -= bytes.u256 reservation.size -= bytes.u256
@ -532,7 +533,7 @@ proc release*(
# rollback release if an update error encountered # rollback release if an update error encountered
trace "rolling back release" trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
rollbackErr.parent = err rollbackErr.parent = err
return failure(rollbackErr) return failure(rollbackErr)
return failure(err) return failure(err)
@ -562,7 +563,7 @@ proc storables(
else: else:
raiseAssert "unknown type" raiseAssert "unknown type"
without results =? await self.repo.metaDs.query(query), error: without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error) return failure(error)
# /sales/reservations # /sales/reservations
@ -666,4 +667,3 @@ proc findAvailability*(
duration, availDuration = availability.duration, duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice, minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral collateral, availMaxCollateral = availability.maxCollateral

View File

@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.}
trace "Unable to delete block from repoStore" trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now: if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid) await self.deleteExpiredBlock(be.cid)
else: else:
inc self.offset inc self.offset
@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
return return
var numberReceived = 0 var numberReceived = 0
for maybeBeFuture in iter: for beFut in iter:
if be =? await maybeBeFuture: let be = await beFut
inc numberReceived inc numberReceived
await self.processBlockExpiration(be) await self.processBlockExpiration(be)
await sleepAsync(50.millis) await sleepAsync(1.millis) # cooperative scheduling
# If we received fewer blockExpirations from the iterator than we asked for, # If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time. # We're at the end of the dataset and should start from 0 next time.

View File

@ -6,7 +6,7 @@ import pkg/datastore/typedds
import ../utils/asynciter import ../utils/asynciter
type KeyVal[T] = tuple[key: Key, value: T] type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T]( proc toAsyncIter*[T](
queryIter: QueryIter[T], queryIter: QueryIter[T],

View File

@ -1,679 +1,5 @@
## Nim-Codex import ./repostore/store
## Copyright (c) 2022 Status Research & Development GmbH import ./repostore/types
## Licensed under either of import ./repostore/coders
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises export store, types, coders
push: {.upraises: [].}
import pkg/chronos
import pkg/chronos/futures
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/lrucache
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ../blocktype
import ../clock
import ../systemclock
import ../logutils
import ../merkletree
import ../utils
export blocktype, cid
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
type
QuotaUsedError* = object of CodexError
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
clock: Clock
totalBlocks*: uint # number of blocks in the store
quotaMaxBytes*: uint # maximum available bytes
quotaUsedBytes*: uint # bytes used by the repo
quotaReservedBytes*: uint # bytes reserved by the repo
blockTtl*: Duration
started*: bool
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64)
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): uint =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE
@nBytes & cidBytes & proofBytes
proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? CodexProof.decode(data[sizeof(uint64) + n..^1])
success((cid, proof))
proc decodeCid(_: type (Cid, CodexProof), data: seq[byte]): ?!Cid =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
success(cid)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blockCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Storing block cid and proof", blockCid, key
let value = (blockCid, proof).encode()
await self.metaDs.put(key, value)
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!(Cid, CodexProof)] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
error "Unable to decode cid and proof", err = err.msg
return failure(err)
return success (cid, proof)
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!Cid] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
# This failure is expected to happen frequently:
# NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine.
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Error getting cid from datastore", err = err.msg, key
return failure(err)
return (Cid, CodexProof).decodeCid(value)
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
error "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
error "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
let (cid, proof) = cidAndProof
without blk =? await self.getBlock(cid), err:
return failure(err)
success((blk, proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
return failure(err)
await self.getBlock(cid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: SecondsSince1970): ?!BatchEntry =
## Get an expiration entry for a batch with timestamp
##
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return success((key, ttl.toBytes))
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: ?Duration): ?!BatchEntry =
## Get an expiration entry for a batch for duration since "now"
##
let duration = ttl |? self.blockTtl
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
logScope:
cid = cid
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
without expiryKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without currentExpiry =? await self.metaDs.get(expiryKey), err:
if err of DatastoreKeyNotFound:
error "No current expiry exists for the block"
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Could not read datastore key", err = err.msg
return failure(err)
logScope:
current = currentExpiry.toSecondsSince1970
ensuring = expiry
if expiry <= currentExpiry.toSecondsSince1970:
trace "Expiry is larger than or equal to requested"
return success()
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
trace "Error updating expiration metadata entry", err = err.msg
return failure(err)
return success()
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
await self.ensureExpiry(cidAndProof[0], expiry)
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
if err =? (await self.metaDs.put(
CodexTotalBlocksKey,
@(self.totalBlocks.uint64.toBytesBE))).errorOption:
trace "Error total blocks key!", err = err.msg
return failure(err)
return success()
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
warn "Error getting key from provider", err = err.msg
return failure(err)
if await key in self.repoDs:
trace "Block already in store", cid = blk.cid
return success()
if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes:
error "Cannot store block, quota used!", used = self.totalUsed
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))
var
batch: seq[BatchEntry]
let
used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
error "Error storing block", err = err.msg
return failure(err)
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
warn "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
error "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption:
error "Error deleting block after failed quota update", err = err.msg
return failure(err)
return failure(err)
self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
warn "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
self.updateMetrics()
return success()
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
logScope:
cid = cid
trace "Deleting block"
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
if blk =? (await self.getBlock(cid)):
if key =? makePrefixKey(self.postFixLen, cid) and
err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block!", err = err.msg
return failure(err)
if isErr (await self.updateQuotaBytesUsed(blk)):
trace "Unable to update quote-bytes-used in metadata store"
return failure("Unable to update quote-bytes-used in metadata store")
if isErr (await self.removeBlockExpirationEntry(blk.cid)):
trace "Unable to remove block expiration entry from metadata store"
return failure("Unable to remove block expiration entry from metadata store")
trace "Deleted block", cid, totalUsed = self.totalUsed
dec self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Fetching proof", key
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return success()
else:
return failure(err)
without cid =? (Cid, CodexProof).decodeCid(value), err:
return failure(err)
trace "Deleting block", cid
if err =? (await self.delBlock(cid)).errorOption:
return failure(err)
await self.metaDs.delete(key)
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(cid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} =
## Get block expirations from the given RepoStore
##
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)
without queryIter =? (await self.metaDs.query(query)), err:
trace "Unable to execute block expirations query"
return failure(err)
var iter = AsyncIter[?BlockExpiration]()
proc next(): Future[?BlockExpiration] {.async.} =
if not queryIter.finished:
if pair =? (await queryIter.next()) and blockKey =? pair.key:
let expirationTimestamp = pair.data
let cidResult = Cid.init(blockKey.value)
if not cidResult.isOk:
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration(
cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970
).some
else:
discard await queryIter.dispose()
iter.finish
return BlockExpiration.none
iter.next = next
return success iter
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes
if (self.totalUsed + bytes) > self.quotaMaxBytes:
trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes
return failure(
newException(QuotaNotEnoughError, "Not enough storage quota to reserver"))
self.quotaReservedBytes += bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error reserving bytes", err = err.msg
self.quotaReservedBytes += bytes
return failure(err)
return success()
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes
if (self.quotaReservedBytes.int - bytes.int) < 0:
trace "Cannot release this many bytes",
quotaReservedBytes = self.quotaReservedBytes, bytes
return failure("Cannot release this many bytes")
self.quotaReservedBytes -= bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error releasing bytes", err = err.msg
self.quotaReservedBytes -= bytes
return failure(err)
trace "Released bytes", bytes
self.updateMetrics()
return success()
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting repo"
without total =? await self.metaDs.get(CodexTotalBlocksKey), err:
if not (err of DatastoreKeyNotFound):
error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey
if total.len > 0:
self.totalBlocks = uint64.fromBytesBE(total).uint
trace "Number of blocks in store at start", total = self.totalBlocks
## load current persist and cache bytes from meta ds
without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting cache bytes from datastore",
err = err.msg, key = $QuotaUsedKey
raise newException(Defect, err.msg)
if quotaUsedBytes.len > 0:
self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint
notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes
without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting persist bytes from datastore",
err = err.msg, key = $QuotaReservedKey
raise newException(Defect, err.msg)
if quotaReservedBytes.len > 0:
self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint
if self.quotaUsedBytes > self.quotaMaxBytes:
raiseAssert "All storage quota used, increase storage quota!"
notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes
self.updateMetrics()
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: metaDs,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)

View File

@ -0,0 +1,47 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
##
import std/sugar
import pkg/libp2p/cid
import pkg/serde/json
import pkg/stew/byteutils
import pkg/stew/endians2
import ./types
import ../../errors
import ../../merkletree
import ../../utils/json
proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes()
proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(i: uint64): seq[byte] =
@(i.toBytesBE)
proc decode*(T: type uint64, bytes: seq[byte]): ?!T =
if bytes.len >= sizeof(uint64):
success(uint64.fromBytesBE(bytes))
else:
failure("Not enough bytes to decode `uint64`")
proc encode*(i: Natural | enum): seq[byte] = cast[uint64](i).encode
proc decode*(T: typedesc[Natural | enum], bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[T](ui))

View File

@ -0,0 +1,213 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ../blockstore
import ../keyutils
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
await self.metaDs.modifyGet(key,
proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} =
var
md: LeafMetadata
res: StoreResultKind
if currMd =? maybeCurrMd:
md = currMd
res = AlreadyInStore
else:
md = LeafMetadata(blkCid: blkCid, proof: proof)
res = Stored
(md.some, res)
)
proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without leafMd =? await get[LeafMetadata](self.metaDs, key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
success(leafMd)
proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} =
await self.metaDs.modify(CodexTotalBlocksKey,
proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
let count: Natural =
if currCount =? maybeCurrCount:
currCount + plusCount - minusCount
else:
plusCount - minusCount
self.totalBlocks = count
codex_repostore_blocks.set(count.int64)
count.some
)
proc updateQuotaUsage*(
self: RepoStore,
plusUsed: NBytes = 0.NBytes,
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes
): Future[?!void] {.async.} =
await self.metaDs.modify(QuotaUsedKey,
proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
var usage: QuotaUsage
if currUsage =? maybeCurrUsage:
usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved)
else:
usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
if usage.used + usage.reserved > self.quotaMaxBytes:
raise newException(QuotaNotEnoughError,
"Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
$usage.reserved & ", limit: " & $self.quotaMaxBytes)
else:
self.quotaUsage = usage
codex_repostore_bytes_used.set(usage.used.int64)
codex_repostore_bytes_reserved.set(usage.reserved.int64)
return usage.some
)
proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,
plusRefCount: Natural = 0,
minusRefCount: Natural = 0,
minExpiry: SecondsSince1970 = 0
): Future[?!void] {.async.} =
if cid.isEmpty:
return success()
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
await self.metaDs.modify(metaKey,
proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} =
if currBlockMd =? maybeCurrBlockMd:
BlockMetadata(
size: currBlockMd.size,
expiry: max(currBlockMd.expiry, minExpiry),
refCount: currBlockMd.refCount + plusRefCount - minusRefCount
).some
else:
raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found")
)
proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
without metaKey =? createBlockExpirationMetadataKey(blk.cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} =
var
md: BlockMetadata
res: StoreResult
if currMd =? maybeCurrMd:
if currMd.size == blk.data.len.NBytes:
md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount)
res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if not hasBlock:
warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
else:
raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid)
else:
md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0)
res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
(md.some, res)
)
proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} =
if cid.isEmpty:
return success(DeleteResult(kind: InUse))
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} =
var
maybeMeta: ?BlockMetadata
res: DeleteResult
if currMd =? maybeCurrMd:
if currMd.refCount == 0 or currMd.expiry < expiryLimit:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: Deleted, released: currMd.size)
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
else:
maybeMeta = currMd.some
res = DeleteResult(kind: InUse)
else:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: NotFound)
# making sure that the block acutally is removed from the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if hasBlock:
warn "Block metadata is absent, but block is present. Removing block.", cid
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
(maybeMeta, res)
)

View File

@ -0,0 +1,395 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/[cid, multicodec]
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ./operations
import ../blockstore
import ../keyutils
import ../queryiterhelper
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
import ../../utils
export blocktype, cid
logScope:
topics = "codex repostore"
###########################################################
# BlockStore API
###########################################################
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
without blk =? await self.getBlock(leafMd.blkCid), err:
return failure(err)
success((blk, leafMd.proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.getBlock(leafMd.blkCid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
await self.updateBlockMetadata(cid, minExpiry = expiry)
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blkCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
treeCid = treeCid
index = index
blkCid = blkCid
trace "Storing LeafMetadata"
without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err:
return failure(err)
if blkCid.mcodec == BlockCodec:
if res == Stored:
if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption:
return failure(err)
trace "Leaf metadata stored, block refCount incremented"
else:
trace "Leaf metadata already exists"
return success()
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!(Cid, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success((leafMd.blkCid, leafMd.proof))
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!Cid] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success(leafMd.blkCid)
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
without res =? await self.storeBlock(blk, expiry), err:
return failure(err)
if res.kind == Stored:
trace "Block Stored"
if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# rollback changes
without delRes =? await self.tryDeleteBlock(blk.cid), err:
return failure(err)
return failure(err)
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err)
else:
trace "Block already exists"
return success()
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore when block refCount is 0 or block is expired
##
logScope:
cid = cid
trace "Attempting to delete a block"
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err:
return failure(err)
if res.kind == Deleted:
trace "Block deleted"
if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption:
return failure(err)
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
return failure(err)
elif res.kind == InUse:
trace "Block in use, refCount > 0 and not expired"
else:
trace "Block not found in store"
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success()
else:
return failure(err)
if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption:
if not (err of BlockNotFoundError):
return failure(err)
await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(leafMd.blkCid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
## Get iterator with block expirations
##
without beQuery =? createBlockExpirationQuery(maxNumber, offset), err:
error "Unable to format block expirations query", err = err.msg
return failure(err)
without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter,
proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
###########################################################
# RepoStore procs
###########################################################
proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", bytes
await self.updateQuotaUsage(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", bytes
await self.updateQuotaUsage(minusReserved = bytes)
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting rep"
if err =? (await self.updateTotalBlocksCount()).errorOption:
raise newException(CodexError, err.msg)
if err =? (await self.updateQuotaUsage()).errorOption:
raise newException(CodexError, err.msg)
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false

View File

@ -0,0 +1,107 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import ../blockstore
import ../../clock
import ../../errors
import ../../merkletree
import ../../systemclock
import ../../units
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 8.GiBs
type
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage
totalBlocks*: Natural
blockTtl*: Duration
started*: bool
QuotaUsage* {.serialize.} = object
used*: NBytes
reserved*: NBytes
BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
size*: NBytes
refCount*: Natural
LeafMetadata* {.serialize.} = object
blkCid*: Cid
proof*: CodexProof
BlockExpiration* {.serialize.} = object
cid*: Cid
expiry*: SecondsSince1970
DeleteResultKind* {.serialize.} = enum
Deleted = 0, # block removed from store
InUse = 1, # block not removed, refCount > 0 and not expired
NotFound = 2 # block not found in store
DeleteResult* {.serialize.} = object
kind*: DeleteResultKind
released*: NBytes
StoreResultKind* {.serialize.} = enum
Stored = 0, # new block stored
AlreadyInStore = 1 # block already in store
StoreResult* {.serialize.} = object
kind*: StoreResultKind
used*: NBytes
func quotaUsedBytes*(self: RepoStore): NBytes =
self.quotaUsage.used
func quotaReservedBytes*(self: RepoStore): NBytes =
self.quotaUsage.reserved
func totalUsed*(self: RepoStore): NBytes =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): NBytes =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: NBytes): bool =
return bytes < self.available()
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: TypedDatastore.init(metaDs),
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)

View File

@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes
logutils.formatIt(NBytes): $it logutils.formatIt(NBytes): $it
const const
MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz
MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz
GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz
proc KiBs*(v: Natural): NBytes = v.NBytes * KiB
proc MiBs*(v: Natural): NBytes = v.NBytes * MiB proc MiBs*(v: Natural): NBytes = v.NBytes * MiB
proc GiBs*(v: Natural): NBytes = v.NBytes * GiB
func divUp*[T: NBytes](a, b : T): int = func divUp*[T: NBytes](a, b : T): int =
## Division with result rounded up (rather than truncated as in 'div') ## Division with result rounded up (rather than truncated as in 'div')

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import std/sequtils import std/sequtils
import std/sugar
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/questionable import pkg/questionable
@ -24,33 +25,28 @@ type
testBlockExpirations*: seq[BlockExpiration] testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool getBlockExpirationsThrows*: bool
iteratorIndex: int
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =
self.delBlockCids.add(cid) self.delBlockCids.add(cid)
self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid)
dec self.iteratorIndex
return success() return success()
method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} =
if self.getBlockExpirationsThrows: if self.getBlockExpirationsThrows:
raise new CatchableError raise new CatchableError
self.getBeMaxNumber = maxNumber self.getBeMaxNumber = maxNumber
self.getBeOffset = offset self.getBeOffset = offset
var iter = AsyncIter[?BlockExpiration]() let
testBlockExpirationsCpy = @(self.testBlockExpirations)
limit = min(offset + maxNumber, len(testBlockExpirationsCpy))
self.iteratorIndex = offset let
var numberLeft = maxNumber iter1 = AsyncIter[int].new(offset..<limit)
proc next(): Future[?BlockExpiration] {.async.} = iter2 = map[int, BlockExpiration](iter1,
if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): proc (i: int): Future[BlockExpiration] {.async.} =
dec numberLeft testBlockExpirationsCpy[i]
let selectedBlock = self.testBlockExpirations[self.iteratorIndex] )
inc self.iteratorIndex
return selectedBlock.some
iter.finish
return BlockExpiration.none
iter.next = next success(iter2)
return success iter

View File

@ -5,7 +5,6 @@ import std/cpuinfo
import pkg/libp2p import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/taskpools import pkg/taskpools
import pkg/codex/codextypes import pkg/codex/codextypes
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/stores import pkg/codex/stores

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint
@ -32,6 +33,7 @@ import pkg/codex/discovery
import pkg/codex/erasure import pkg/codex/erasure
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore/coders
import pkg/codex/utils/asynciter import pkg/codex/utils/asynciter
import pkg/codex/indexingstrategy import pkg/codex/indexingstrategy
@ -110,10 +112,11 @@ asyncchecksuite "Test Node - Host contracts":
for index in 0..<manifest.blocksCount: for index in 0..<manifest.blocksCount:
let let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey) bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry check blkMd.expiry == expectedExpiry
test "onStore callback is set": test "onStore callback is set":
check sales.onStore.isSome check sales.onStore.isSome
@ -139,7 +142,8 @@ asyncchecksuite "Test Node - Host contracts":
for index in indexer.getIndicies(1): for index in indexer.getIndicies(1):
let let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey) bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970 check blkMd.expiry == request.expiry.toSecondsSince1970

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint

View File

@ -77,9 +77,9 @@ asyncchecksuite "Reservations module":
check availability.id != AvailabilityId.default check availability.id != AvailabilityId.default
test "creating availability reserves bytes in repo": test "creating availability reserves bytes in repo":
let orig = repo.available let orig = repo.available.uint
let availability = createAvailability() let availability = createAvailability()
check repo.available == (orig.u256 - availability.freeSize).truncate(uint) check repo.available.uint == (orig.u256 - availability.freeSize).truncate(uint)
test "can get all availabilities": test "can get all availabilities":
let availability1 = createAvailability() let availability1 = createAvailability()
@ -248,7 +248,7 @@ asyncchecksuite "Reservations module":
check updated.freeSize > orig check updated.freeSize > orig
check (updated.freeSize - orig) == 200.u256 check (updated.freeSize - orig) == 200.u256
check (repo.quotaReservedBytes - origQuota) == 200 check (repo.quotaReservedBytes - origQuota) == 200.NBytes
test "update releases quota when lowering size": test "update releases quota when lowering size":
let let
@ -257,7 +257,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize - 100 availability.totalSize = availability.totalSize - 100
check isOk await reservations.update(availability) check isOk await reservations.update(availability)
check (origQuota - repo.quotaReservedBytes) == 100 check (origQuota - repo.quotaReservedBytes) == 100.NBytes
test "update reserves quota when growing size": test "update reserves quota when growing size":
let let
@ -266,7 +266,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize + 100 availability.totalSize = availability.totalSize + 100
check isOk await reservations.update(availability) check isOk await reservations.update(availability)
check (repo.quotaReservedBytes - origQuota) == 100 check (repo.quotaReservedBytes - origQuota) == 100.NBytes
test "reservation can be partially released": test "reservation can be partially released":
let availability = createAvailability() let availability = createAvailability()
@ -370,17 +370,17 @@ asyncchecksuite "Reservations module":
check got.error of NotExistsError check got.error of NotExistsError
test "can get available bytes in repo": test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes check reservations.available == DefaultQuotaBytes.uint
test "reports quota available to be reserved": test "reports quota available to be reserved":
check reservations.hasAvailable(DefaultQuotaBytes - 1) check reservations.hasAvailable(DefaultQuotaBytes.uint - 1)
test "reports quota not available to be reserved": test "reports quota not available to be reserved":
check not reservations.hasAvailable(DefaultQuotaBytes + 1) check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1)
test "fails to create availability with size that is larger than available quota": test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability( let created = await reservations.createAvailability(
(DefaultQuotaBytes + 1).u256, (DefaultQuotaBytes.uint + 1).u256,
UInt256.example, UInt256.example,
UInt256.example, UInt256.example,
UInt256.example UInt256.example

View File

@ -2,7 +2,7 @@ import std/sequtils
import std/sugar import std/sugar
import std/times import std/times
import pkg/chronos import pkg/chronos
import pkg/datastore import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/codex/sales import pkg/codex/sales

View File

@ -0,0 +1,71 @@
import std/unittest
import std/random
import pkg/stew/objects
import pkg/questionable
import pkg/questionable/results
import pkg/codex/clock
import pkg/codex/stores/repostore/types
import pkg/codex/stores/repostore/coders
import ../../helpers
checksuite "Test coders":
proc rand(T: type NBytes): T =
rand(Natural).NBytes
proc rand(E: type[enum]): E =
let ordinals = enumRangeInt64(E)
E(ordinals[rand(ordinals.len - 1)])
proc rand(T: type QuotaUsage): T =
QuotaUsage(
used: rand(NBytes),
reserved: rand(NBytes)
)
proc rand(T: type BlockMetadata): T =
BlockMetadata(
expiry: rand(SecondsSince1970),
size: rand(NBytes),
refCount: rand(Natural)
)
proc rand(T: type DeleteResult): T =
DeleteResult(
kind: rand(DeleteResultKind),
released: rand(NBytes)
)
proc rand(T: type StoreResult): T =
StoreResult(
kind: rand(StoreResultKind),
used: rand(NBytes)
)
test "Natural encode/decode":
for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]:
check:
success(val) == Natural.decode(encode(val))
test "QuotaUsage encode/decode":
for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)):
check:
success(val) == QuotaUsage.decode(encode(val))
test "BlockMetadata encode/decode":
for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)):
check:
success(val) == BlockMetadata.decode(encode(val))
test "DeleteResult encode/decode":
for val in newSeqWith[DeleteResult](100, rand(DeleteResult)):
check:
success(val) == DeleteResult.decode(encode(val))
test "StoreResult encode/decode":
for val in newSeqWith[StoreResult](100, rand(StoreResult)):
check:
success(val) == StoreResult.decode(encode(val))

View File

@ -34,10 +34,10 @@ checksuite "BlockMaintainer":
var testBe2: BlockExpiration var testBe2: BlockExpiration
var testBe3: BlockExpiration var testBe3: BlockExpiration
proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration =
BlockExpiration( BlockExpiration(
cid: bt.Block.example.cid, cid: bt.Block.example.cid,
expiration: expiration expiry: expiry
) )
setup: setup:
@ -186,4 +186,3 @@ checksuite "BlockMaintainer":
await invokeTimerManyTimes() await invokeTimerManyTimes()
# Second new block has expired # Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]

View File

@ -23,6 +23,8 @@ import ../helpers/mockclock
import ../examples import ../examples
import ./commonstoretests import ./commonstoretests
import ./repostore/testcoders
checksuite "Test RepoStore start/stop": checksuite "Test RepoStore start/stop":
var var
@ -34,24 +36,24 @@ checksuite "Test RepoStore start/stop":
metaDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet()
test "Should set started flag once started": test "Should set started flag once started":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
check repo.started check repo.started
test "Should set started flag to false once stopped": test "Should set started flag to false once stopped":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
await repo.stop() await repo.stop()
check not repo.started check not repo.started
test "Should allow start to be called multiple times": test "Should allow start to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
await repo.start() await repo.start()
check repo.started check repo.started
test "Should allow stop to be called multiple times": test "Should allow stop to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.stop() await repo.stop()
await repo.stop() await repo.stop()
check not repo.started check not repo.started
@ -73,7 +75,7 @@ asyncchecksuite "RepoStore":
mockClock = MockClock.new() mockClock = MockClock.new()
mockClock.set(now) mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200) repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb)
teardown: teardown:
(await repoDs.close()).tryGet (await repoDs.close()).tryGet
@ -85,117 +87,107 @@ asyncchecksuite "RepoStore":
test "Should update current used bytes on block put": test "Should update current used bytes on block put":
let blk = createTestBlock(200) let blk = createTestBlock(200)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check: check:
repo.quotaUsedBytes == 200 repo.quotaUsedBytes == 200'nb
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
test "Should update current used bytes on block delete": test "Should update current used bytes on block delete":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
(await repo.delBlock(blk.cid)).tryGet (await repo.delBlock(blk.cid)).tryGet
check: check:
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
test "Should not update current used bytes if block exist": test "Should not update current used bytes if block exist":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
# put again # put again
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
check:
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
test "Should fail storing passed the quota": test "Should fail storing passed the quota":
let blk = createTestBlock(300) let blk = createTestBlock(300)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
expect QuotaUsedError: expect QuotaNotEnoughError:
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
test "Should reserve bytes": test "Should reserve bytes":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check: check:
repo.totalUsed == 200 repo.totalUsed == 200'nb
repo.quotaUsedBytes == 100 repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 100 repo.quotaReservedBytes == 100'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
test "Should not reserve bytes over max quota": test "Should not reserve bytes over max quota":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
expect QuotaNotEnoughError: expect QuotaNotEnoughError:
(await repo.reserve(101)).tryGet (await repo.reserve(101'nb)).tryGet
check: check:
repo.totalUsed == 100 repo.totalUsed == 100'nb
repo.quotaUsedBytes == 100 repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 0 repo.quotaReservedBytes == 0'nb
expect DatastoreKeyNotFound:
discard (await metaDs.get(QuotaReservedKey)).tryGet
test "Should release bytes": test "Should release bytes":
discard createTestBlock(100) discard createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
(await repo.release(100)).tryGet (await repo.release(100'nb)).tryGet
check: check:
repo.totalUsed == 0 repo.totalUsed == 0'nb
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 0 repo.quotaReservedBytes == 0'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
test "Should not release bytes less than quota": test "Should not release bytes less than quota":
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
expect CatchableError: expect RangeDefect:
(await repo.release(101)).tryGet (await repo.release(101'nb)).tryGet
check: check:
repo.totalUsed == 100 repo.totalUsed == 100'nb
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 100 repo.quotaReservedBytes == 100'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = proc getExpirations(): Future[seq[BlockExpiration]] {.async.} =
let let iter = (await repo.getBlockExpirations(100, 0)).tryGet()
query = Query.init(key)
responseIter = (await metaDs.query(query)).tryGet var res = newSeq[BlockExpiration]()
response = (await allFinished(toSeq(responseIter))) for fut in iter:
.mapIt(it.read.tryGet) let be = await fut
.filterIt(it.key.isSome) res.add(be)
return response
res
test "Should store block expiration timestamp": test "Should store block expiration timestamp":
let let
@ -203,49 +195,40 @@ asyncchecksuite "RepoStore":
blk = createTestBlock(100) blk = createTestBlock(100)
let let
expectedExpiration: SecondsSince1970 = 123 + 10 expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, duration.some)).tryGet (await repo.putBlock(blk, duration.some)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
test "Should store block with default expiration timestamp when not provided": test "Should store block with default expiration timestamp when not provided":
let let
blk = createTestBlock(100) blk = createTestBlock(100)
let let
expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
test "Should refuse update expiry with negative timestamp": test "Should refuse update expiry with negative timestamp":
let let
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + 10 expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some 10.seconds)).tryGet (await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expect ValueError: expect ValueError:
(await repo.ensureExpiry(blk.cid, -1)).tryGet (await repo.ensureExpiry(blk.cid, -1)).tryGet
@ -262,56 +245,45 @@ asyncchecksuite "RepoStore":
test "Should update block expiration timestamp when new expiration is farther": test "Should update block expiration timestamp when new expiration is farther":
let let
duration = 10
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some duration.seconds)).tryGet (await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet (await repo.ensureExpiry(blk.cid, now + 20)).tryGet
response = await queryMetaDs(expectedKey) let updatedExpirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration notin updatedExpirations
!response[0].key == expectedKey updatedExpectedExpiration in updatedExpirations
response[0].data == updatedExpectedExpiration.toBytes
test "Should not update block expiration timestamp when current expiration is farther then new one": test "Should not update block expiration timestamp when current expiration is farther then new one":
let let
duration = 10
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some 10.seconds)).tryGet
(await repo.putBlock(blk, some duration.seconds)).tryGet let expirations = await getExpirations()
var response = await queryMetaDs(expectedKey)
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet (await repo.ensureExpiry(blk.cid, now + 5)).tryGet
response = await queryMetaDs(expectedKey) let updatedExpirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in updatedExpirations
!response[0].key == expectedKey updatedExpectedExpiration notin updatedExpirations
response[0].data == expectedExpiration.toBytes
test "delBlock should remove expiration metadata": test "delBlock should remove expiration metadata":
let let
@ -321,19 +293,19 @@ asyncchecksuite "RepoStore":
(await repo.putBlock(blk, 10.seconds.some)).tryGet (await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet (await repo.delBlock(blk.cid)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 0 expirations.len == 0
test "Should retrieve block expiration information": test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0) var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err: without iter =? (await beIter), err:
return expirations return expirations
for be in toSeq(iter): for beFut in toSeq(iter):
if value =? (await be): let value = await beFut
expirations.add(value) expirations.add(value)
return expirations return expirations
let let
@ -343,12 +315,12 @@ asyncchecksuite "RepoStore":
blk3 = createTestBlock(12) blk3 = createTestBlock(12)
let let
expectedExpiration: SecondsSince1970 = 123 + 10 expectedExpiration: SecondsSince1970 = now + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check: check:
be.cid == expectedBlock.cid be.cid == expectedBlock.cid
be.expiration == expectedExpiration be.expiry == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet (await repo.putBlock(blk1, duration.some)).tryGet

View File

@ -1,5 +1,6 @@
import std/sequtils import std/sequtils
from pkg/libp2p import `==` from pkg/libp2p import `==`
import pkg/codex/units
import ./twonodes import ./twonodes
twonodessuite "REST API", debug1 = false, debug2 = false: twonodessuite "REST API", debug1 = false, debug2 = false:
@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false:
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet() let space = client1.space().tryGet()
check: check:
space.totalBlocks == 2.uint space.totalBlocks == 2
space.quotaMaxBytes == 8589934592.uint space.quotaMaxBytes == 8589934592.NBytes
space.quotaUsedBytes == 65592.uint space.quotaUsedBytes == 65592.NBytes
space.quotaReservedBytes == 12.uint space.quotaReservedBytes == 12.NBytes
test "node lists local files": test "node lists local files":
let content1 = "some file contents" let content1 = "some file contents"