From 5d4b971f03a76172a5a43c9517869d73c5479d8d Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 13 Dec 2023 16:25:25 +0100 Subject: [PATCH] Block deletion with ref count & repostore refactor --- codex/codex.nim | 2 +- codex/merkletree/codex/coders.nim | 17 + codex/rest/json.nim | 9 +- codex/sales/reservations.nim | 34 +- codex/stores/maintenance.nim | 12 +- codex/stores/queryiterhelper.nim | 2 +- codex/stores/repostore.nim | 682 +------------------------ codex/stores/repostore/coders.nim | 53 ++ codex/stores/repostore/operations.nim | 213 ++++++++ codex/stores/repostore/store.nim | 395 ++++++++++++++ codex/stores/repostore/types.nim | 107 ++++ codex/units.nim | 6 +- tests/codex/helpers/mockrepostore.nim | 28 +- tests/codex/node/helpers.nim | 1 - tests/codex/node/testcontracts.nim | 16 +- tests/codex/node/testnode.nim | 1 + tests/codex/sales/testreservations.nim | 18 +- tests/codex/sales/testsales.nim | 2 +- tests/codex/stores/testmaintenance.nim | 5 +- tests/codex/stores/testrepostore.nim | 202 ++++---- tests/integration/testrestapi.nim | 9 +- 21 files changed, 950 insertions(+), 864 deletions(-) create mode 100644 codex/stores/repostore/coders.nim create mode 100644 codex/stores/repostore/operations.nim create mode 100644 codex/stores/repostore/store.nim create mode 100644 codex/stores/repostore/types.nim diff --git a/codex/codex.nim b/codex/codex.nim index fa9d0c13..0b9182fb 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -258,7 +258,7 @@ proc new*( repoDs = repoData, metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create metadata store!"), - quotaMaxBytes = config.storageQuota.uint, + quotaMaxBytes = config.storageQuota, blockTtl = config.blockTtl) maintenance = BlockMaintainer.new( diff --git a/codex/merkletree/codex/coders.nim b/codex/merkletree/codex/coders.nim index 62e4f75b..a2d5a24b 100644 --- a/codex/merkletree/codex/coders.nim +++ b/codex/merkletree/codex/coders.nim @@ -14,6 +14,8 @@ push: {.upraises: [].} import pkg/libp2p import pkg/questionable import pkg/questionable/results +import pkg/stew/byteutils +import pkg/serde/json import ../../units import ../../errors @@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof = nodes.add node CodexProof.init(mcodec, index.int, nleaves.int, nodes) + +proc fromJson*( + _: type CodexProof, + json: JsonNode +): ?!CodexProof = + expectJsonKind(Cid, JString, json) + var bytes: seq[byte] + try: + bytes = hexToSeqByte(json.str) + except ValueError as err: + return failure(err) + + CodexProof.decode(bytes) + +func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode()) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 142af9d2..fba708be 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -7,6 +7,7 @@ import ../sales import ../purchasing import ../utils/json import ../manifest +import ../units export json @@ -65,10 +66,10 @@ type id*: NodeId RestRepoStore* = object - totalBlocks* {.serialize.}: uint - quotaMaxBytes* {.serialize.}: uint - quotaUsedBytes* {.serialize.}: uint - quotaReservedBytes* {.serialize.}: uint + totalBlocks* {.serialize.}: Natural + quotaMaxBytes* {.serialize.}: NBytes + quotaUsedBytes* {.serialize.}: NBytes + quotaReservedBytes* {.serialize.}: NBytes proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList = RestContentList( diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 8ba762ea..caca409c 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -46,6 +46,7 @@ import ../stores import ../market import ../contracts/requests import ../utils/json +import ../units export requests export logutils @@ -166,16 +167,16 @@ func key*(availability: Availability): ?!Key = func key*(reservation: Reservation): ?!Key = return key(reservation.id, reservation.availabilityId) -func available*(self: Reservations): uint = self.repo.available +func available*(self: Reservations): uint = self.repo.available.uint func hasAvailable*(self: Reservations, bytes: uint): bool = - self.repo.available(bytes) + self.repo.available(bytes.NBytes) proc exists*( self: Reservations, key: Key): Future[bool] {.async.} = - let exists = await self.repo.metaDs.contains(key) + let exists = await self.repo.metaDs.ds.contains(key) return exists proc getImpl( @@ -186,7 +187,7 @@ proc getImpl( let err = newException(NotExistsError, "object with key " & $key & " does not exist") return failure(err) - without serialized =? await self.repo.metaDs.get(key), error: + without serialized =? await self.repo.metaDs.ds.get(key), error: return failure(error.toErr(GetFailedError)) return success serialized @@ -213,7 +214,7 @@ proc updateImpl( without key =? obj.key, error: return failure(error) - if err =? (await self.repo.metaDs.put( + if err =? (await self.repo.metaDs.ds.put( key, @(obj.toJson.toBytes) )).errorOption: @@ -257,11 +258,11 @@ proc update*( # Sizing of the availability changed, we need to adjust the repo reservation accordingly if oldAvailability.totalSize != obj.totalSize: if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReleaseFailedError)) let res = await self.updateImpl(obj) @@ -294,7 +295,7 @@ proc delete( if not await self.exists(key): return success() - if err =? (await self.repo.metaDs.delete(key)).errorOption: + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: return failure(err.toErr(DeleteFailedError)) return success() @@ -333,7 +334,7 @@ proc deleteReservation*( if updateErr =? (await self.update(availability)).errorOption: return failure(updateErr) - if err =? (await self.repo.metaDs.delete(key)).errorOption: + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: return failure(err.toErr(DeleteFailedError)) return success() @@ -355,14 +356,14 @@ proc createAvailability*( ) let bytes = availability.freeSize.truncate(uint) - if reserveErr =? (await self.repo.reserve(bytes)).errorOption: + if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) if updateErr =? (await self.update(availability)).errorOption: # rollback the reserve trace "rolling back reserve" - if rollbackErr =? (await self.repo.release(bytes)).errorOption: + if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption: rollbackErr.parent = updateErr return failure(rollbackErr) @@ -448,7 +449,7 @@ proc returnBytesToAvailability*( # First lets see if we can re-reserve the bytes, if the Repo's quota # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) without availabilityKey =? availabilityId.key, error: @@ -463,7 +464,7 @@ proc returnBytesToAvailability*( if updateErr =? (await self.update(availability)).errorOption: trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: + if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption: rollbackErr.parent = updateErr return failure(rollbackErr) @@ -497,7 +498,7 @@ proc release*( "trying to release an amount of bytes that is greater than the total size of the Reservation") return failure(error) - if releaseErr =? (await self.repo.release(bytes)).errorOption: + if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption: return failure(releaseErr.toErr(ReleaseFailedError)) reservation.size -= bytes.u256 @@ -507,7 +508,7 @@ proc release*( # rollback release if an update error encountered trace "rolling back release" - if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: + if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: rollbackErr.parent = err return failure(rollbackErr) return failure(err) @@ -537,7 +538,7 @@ proc storables( else: raiseAssert "unknown type" - without results =? await self.repo.metaDs.query(query), error: + without results =? await self.repo.metaDs.ds.query(query), error: return failure(error) # /sales/reservations @@ -639,4 +640,3 @@ proc findAvailability*( duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral - diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 76193a53..63c6ba40 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} trace "Unable to delete block from repoStore" proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = - if be.expiration < self.clock.now: + if be.expiry < self.clock.now: await self.deleteExpiredBlock(be.cid) else: inc self.offset @@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = return var numberReceived = 0 - for maybeBeFuture in iter: - if be =? await maybeBeFuture: - inc numberReceived - await self.processBlockExpiration(be) - await sleepAsync(50.millis) + for beFut in iter: + let be = await beFut + inc numberReceived + await self.processBlockExpiration(be) + await sleepAsync(1.millis) # cooperative scheduling # If we received fewer blockExpirations from the iterator than we asked for, # We're at the end of the dataset and should start from 0 next time. diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 357e9401..7c51d215 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/datastore/typedds import ../utils/asynciter -type KeyVal[T] = tuple[key: Key, value: T] +type KeyVal*[T] = tuple[key: Key, value: T] proc toAsyncIter*[T]( queryIter: QueryIter[T], diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index c129710f..5937cbfc 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -1,679 +1,5 @@ -## Nim-Codex -## Copyright (c) 2022 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. +import ./repostore/store +import ./repostore/types +import ./repostore/coders -import pkg/upraises - -push: {.upraises: [].} - -import pkg/chronos -import pkg/chronos/futures -import pkg/libp2p/[cid, multicodec, multihash] -import pkg/lrucache -import pkg/metrics -import pkg/questionable -import pkg/questionable/results -import pkg/datastore -import pkg/stew/endians2 - -import ./blockstore -import ./keyutils -import ../blocktype -import ../clock -import ../systemclock -import ../logutils -import ../merkletree -import ../utils - -export blocktype, cid - -logScope: - topics = "codex repostore" - -declareGauge(codex_repostore_blocks, "codex repostore blocks") -declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") -declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") - -const - DefaultBlockTtl* = 24.hours - DefaultQuotaBytes* = 1'u shl 33'u # ~8GB - -type - QuotaUsedError* = object of CodexError - QuotaNotEnoughError* = object of CodexError - - RepoStore* = ref object of BlockStore - postFixLen*: int - repoDs*: Datastore - metaDs*: Datastore - clock: Clock - totalBlocks*: uint # number of blocks in the store - quotaMaxBytes*: uint # maximum available bytes - quotaUsedBytes*: uint # bytes used by the repo - quotaReservedBytes*: uint # bytes reserved by the repo - blockTtl*: Duration - started*: bool - - BlockExpiration* = object - cid*: Cid - expiration*: SecondsSince1970 - -proc updateMetrics(self: RepoStore) = - codex_repostore_blocks.set(self.totalBlocks.int64) - codex_repostore_bytes_used.set(self.quotaUsedBytes.int64) - codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64) - -func totalUsed*(self: RepoStore): uint = - (self.quotaUsedBytes + self.quotaReservedBytes) - -func available*(self: RepoStore): uint = - return self.quotaMaxBytes - self.totalUsed - -func available*(self: RepoStore, bytes: uint): bool = - return bytes < self.available() - -proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] = - ## Encodes a tuple of cid and merkle proof in a following format: - ## | 8-bytes | n-bytes | remaining bytes | - ## | n | cid | proof | - ## - ## where n is a size of cid - ## - let - (cid, proof) = cidAndProof - cidBytes = cid.data.buffer - proofBytes = proof.encode - n = cidBytes.len - nBytes = n.uint64.toBytesBE - - @nBytes & cidBytes & proofBytes - -proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) = - let - n = uint64.fromBytesBE(data[0.. self.quotaMaxBytes: - error "Cannot store block, quota used!", used = self.totalUsed - return failure( - newException(QuotaUsedError, "Cannot store block, quota used!")) - - var - batch: seq[BatchEntry] - - let - used = self.quotaUsedBytes + blk.data.len.uint - - if err =? (await self.repoDs.put(key, blk.data)).errorOption: - error "Error storing block", err = err.msg - return failure(err) - - batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) - - without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: - warn "Unable to create block expiration metadata key", err = err.msg - return failure(err) - batch.add(blockExpEntry) - - if err =? (await self.metaDs.put(batch)).errorOption: - error "Error updating quota bytes", err = err.msg - - if err =? (await self.repoDs.delete(key)).errorOption: - error "Error deleting block after failed quota update", err = err.msg - return failure(err) - - return failure(err) - - self.quotaUsedBytes = used - inc self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - warn "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = - let used = self.quotaUsedBytes - blk.data.len.uint - if err =? (await self.metaDs.put( - QuotaUsedKey, - @(used.uint64.toBytesBE))).errorOption: - trace "Error updating quota key!", err = err.msg - return failure(err) - self.quotaUsedBytes = used - self.updateMetrics() - return success() - -proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - without key =? createBlockExpirationMetadataKey(cid), err: - return failure(err) - return await self.metaDs.delete(key) - -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore - ## - - logScope: - cid = cid - - trace "Deleting block" - - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - if blk =? (await self.getBlock(cid)): - if key =? makePrefixKey(self.postFixLen, cid) and - err =? (await self.repoDs.delete(key)).errorOption: - trace "Error deleting block!", err = err.msg - return failure(err) - - if isErr (await self.updateQuotaBytesUsed(blk)): - trace "Unable to update quote-bytes-used in metadata store" - return failure("Unable to update quote-bytes-used in metadata store") - - if isErr (await self.removeBlockExpirationEntry(blk.cid)): - trace "Unable to remove block expiration entry from metadata store" - return failure("Unable to remove block expiration entry from metadata store") - - trace "Deleted block", cid, totalUsed = self.totalUsed - - dec self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - trace "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = - without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: - return failure(err) - - trace "Fetching proof", key - without value =? await self.metaDs.get(key), err: - if err of DatastoreKeyNotFound: - return success() - else: - return failure(err) - - without cid =? (Cid, CodexProof).decodeCid(value), err: - return failure(err) - - trace "Deleting block", cid - if err =? (await self.delBlock(cid)).errorOption: - return failure(err) - - await self.metaDs.delete(key) - -method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore - ## - - logScope: - cid = cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success true - - without key =? makePrefixKey(self.postFixLen, cid), err: - trace "Error getting key from provider", err = err.msg - return failure(err) - - return await self.repoDs.has(key) - -method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = - without cid =? await self.getCid(treeCid, index), err: - if err of BlockNotFoundError: - return success(false) - else: - return failure(err) - - await self.hasBlock(cid) - -method listBlocks*( - self: RepoStore, - blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.async.} = - ## Get the list of blocks in the RepoStore. - ## This is an intensive operation - ## - - var - iter = AsyncIter[?Cid]() - - let key = - case blockType: - of BlockType.Manifest: CodexManifestKey - of BlockType.Block: CodexBlocksKey - of BlockType.Both: CodexRepoKey - - let query = Query.init(key, value=false) - without queryIter =? (await self.repoDs.query(query)), err: - trace "Error querying cids in repo", blockType, err = err.msg - return failure(err) - - proc next(): Future[?Cid] {.async.} = - await idleAsync() - if queryIter.finished: - iter.finish - else: - if pair =? (await queryIter.next()) and cid =? pair.key: - doAssert pair.data.len == 0 - trace "Retrieved record from repo", cid - return Cid.init(cid.value).option - else: - return Cid.none - - iter.next = next - return success iter - -proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = - let queryKey = ? createBlockExpirationMetadataQueryKey() - success Query.init(queryKey, offset = offset, limit = maxNumber) - -method getBlockExpirations*( - self: RepoStore, - maxNumber: int, - offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} = - ## Get block expirations from the given RepoStore - ## - - without query =? createBlockExpirationQuery(maxNumber, offset), err: - trace "Unable to format block expirations query" - return failure(err) - - without queryIter =? (await self.metaDs.query(query)), err: - trace "Unable to execute block expirations query" - return failure(err) - - var iter = AsyncIter[?BlockExpiration]() - - proc next(): Future[?BlockExpiration] {.async.} = - if not queryIter.finished: - if pair =? (await queryIter.next()) and blockKey =? pair.key: - let expirationTimestamp = pair.data - let cidResult = Cid.init(blockKey.value) - if not cidResult.isOk: - raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error) - return BlockExpiration( - cid: cidResult.get, - expiration: expirationTimestamp.toSecondsSince1970 - ).some - else: - discard await queryIter.dispose() - iter.finish - return BlockExpiration.none - - iter.next = next - return success iter - -method close*(self: RepoStore): Future[void] {.async.} = - ## Close the blockstore, cleaning up resources managed by it. - ## For some implementations this may be a no-op - ## - - trace "Closing repostore" - - if not self.metaDs.isNil: - (await self.metaDs.close()).expect("Should meta datastore") - - if not self.repoDs.isNil: - (await self.repoDs.close()).expect("Should repo datastore") - -proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Reserve bytes - ## - - trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes - - if (self.totalUsed + bytes) > self.quotaMaxBytes: - trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes - return failure( - newException(QuotaNotEnoughError, "Not enough storage quota to reserver")) - - self.quotaReservedBytes += bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error reserving bytes", err = err.msg - - self.quotaReservedBytes += bytes - return failure(err) - - return success() - -proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Release bytes - ## - - trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes - - if (self.quotaReservedBytes.int - bytes.int) < 0: - trace "Cannot release this many bytes", - quotaReservedBytes = self.quotaReservedBytes, bytes - - return failure("Cannot release this many bytes") - - self.quotaReservedBytes -= bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error releasing bytes", err = err.msg - - self.quotaReservedBytes -= bytes - - return failure(err) - - trace "Released bytes", bytes - self.updateMetrics() - return success() - -proc start*(self: RepoStore): Future[void] {.async.} = - ## Start repo - ## - - if self.started: - trace "Repo already started" - return - - trace "Starting repo" - - without total =? await self.metaDs.get(CodexTotalBlocksKey), err: - if not (err of DatastoreKeyNotFound): - error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey - - if total.len > 0: - self.totalBlocks = uint64.fromBytesBE(total).uint - trace "Number of blocks in store at start", total = self.totalBlocks - - ## load current persist and cache bytes from meta ds - without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting cache bytes from datastore", - err = err.msg, key = $QuotaUsedKey - - raise newException(Defect, err.msg) - - if quotaUsedBytes.len > 0: - self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint - - notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes - - without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting persist bytes from datastore", - err = err.msg, key = $QuotaReservedKey - - raise newException(Defect, err.msg) - - if quotaReservedBytes.len > 0: - self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint - - if self.quotaUsedBytes > self.quotaMaxBytes: - raiseAssert "All storage quota used, increase storage quota!" - - notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes - - self.updateMetrics() - self.started = true - -proc stop*(self: RepoStore): Future[void] {.async.} = - ## Stop repo - ## - if not self.started: - trace "Repo is not started" - return - - trace "Stopping repo" - await self.close() - - self.started = false - -func new*( - T: type RepoStore, - repoDs: Datastore, - metaDs: Datastore, - clock: Clock = SystemClock.new(), - postFixLen = 2, - quotaMaxBytes = DefaultQuotaBytes, - blockTtl = DefaultBlockTtl -): RepoStore = - ## Create new instance of a RepoStore - ## - RepoStore( - repoDs: repoDs, - metaDs: metaDs, - clock: clock, - postFixLen: postFixLen, - quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl - ) +export store, types, coders diff --git a/codex/stores/repostore/coders.nim b/codex/stores/repostore/coders.nim new file mode 100644 index 00000000..7b3090e4 --- /dev/null +++ b/codex/stores/repostore/coders.nim @@ -0,0 +1,53 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. +## + +import std/sugar +import pkg/libp2p/cid +import pkg/serde/json +import pkg/stew/byteutils +import pkg/stew/endians2 + +import ./types +import ../../errors +import ../../merkletree +import ../../utils/json + +proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes() +proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(i: uint64): seq[byte] = + @(i.toBytesBE) + +proc decode*(T: type uint64, bytes: seq[byte]): ?!T = + if bytes.len >= sizeof(uint64): + success(uint64.fromBytesBE(bytes)) + else: + failure("Not enough bytes to decode `uint64`") + +proc encode*(i: int): seq[byte] = cast[uint64](i).encode +proc decode*(T: type int, bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[int](ui)) + +proc encode*[T: enum](e: T): seq[byte] = e.ord().encode +proc decode*(T: typedesc[enum], bytes: seq[byte]): ?!T = int.decode(bytes).map((ui: int) => T(ui)) + +proc encode*(i: Natural): seq[byte] = cast[int](i).encode +proc decode*(T: type Natural, bytes: seq[byte]): ?!T = int.decode(bytes).map((ui: int) => cast[Natural](ui)) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim new file mode 100644 index 00000000..e000bb0a --- /dev/null +++ b/codex/stores/repostore/operations.nim @@ -0,0 +1,213 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid +import pkg/metrics +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ../blockstore +import ../keyutils +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree + +logScope: + topics = "codex repostore" + +declareGauge(codex_repostore_blocks, "codex repostore blocks") +declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") +declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") + +proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + await self.metaDs.modifyGet(key, + proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} = + var + md: LeafMetadata + res: StoreResultKind + + if currMd =? maybeCurrMd: + md = currMd + res = AlreadyInStore + else: + md = LeafMetadata(blkCid: blkCid, proof: proof) + res = Stored + + (md.some, res) + ) + +proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + without leafMd =? await get[LeafMetadata](self.metaDs, key), err: + if err of DatastoreKeyNotFound: + return failure(newException(BlockNotFoundError, err.msg)) + else: + return failure(err) + + success(leafMd) + +proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} = + await self.metaDs.modify(CodexTotalBlocksKey, + proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} = + let count: Natural = + if currCount =? maybeCurrCount: + currCount + plusCount - minusCount + else: + plusCount - minusCount + + self.totalBlocks = count + codex_repostore_blocks.set(count.int64) + count.some + ) + +proc updateQuotaUsage*( + self: RepoStore, + plusUsed: NBytes = 0.NBytes, + minusUsed: NBytes = 0.NBytes, + plusReserved: NBytes = 0.NBytes, + minusReserved: NBytes = 0.NBytes +): Future[?!void] {.async.} = + await self.metaDs.modify(QuotaUsedKey, + proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = + var usage: QuotaUsage + + if currUsage =? maybeCurrUsage: + usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved) + else: + usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + + if usage.used + usage.reserved > self.quotaMaxBytes: + raise newException(QuotaNotEnoughError, + "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & + $usage.reserved & ", limit: " & $self.quotaMaxBytes) + else: + self.quotaUsage = usage + codex_repostore_bytes_used.set(usage.used.int64) + codex_repostore_bytes_reserved.set(usage.reserved.int64) + return usage.some + ) + +proc updateBlockMetadata*( + self: RepoStore, + cid: Cid, + plusRefCount: Natural = 0, + minusRefCount: Natural = 0, + minExpiry: SecondsSince1970 = 0 +): Future[?!void] {.async.} = + if cid.isEmpty: + return success() + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + await self.metaDs.modify(metaKey, + proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} = + if currBlockMd =? maybeCurrBlockMd: + BlockMetadata( + size: currBlockMd.size, + expiry: max(currBlockMd.expiry, minExpiry), + refCount: currBlockMd.refCount + plusRefCount - minusRefCount + ).some + else: + raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found") + ) + +proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} = + if blk.isEmpty: + return success(StoreResult(kind: AlreadyInStore)) + + without metaKey =? createBlockExpirationMetadataKey(blk.cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} = + var + md: BlockMetadata + res: StoreResult + + if currMd =? maybeCurrMd: + if currMd.size == blk.data.len.NBytes: + md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount) + res = StoreResult(kind: AlreadyInStore) + + # making sure that the block acutally is stored in the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if not hasBlock: + warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + else: + raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid) + else: + md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0) + res = StoreResult(kind: Stored, used: blk.data.len.NBytes) + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + + (md.some, res) + ) + +proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} = + if cid.isEmpty: + return success(DeleteResult(kind: InUse)) + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} = + var + maybeMeta: ?BlockMetadata + res: DeleteResult + + if currMd =? maybeCurrMd: + if currMd.refCount == 0 or currMd.expiry < expiryLimit: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: Deleted, released: currMd.size) + + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + else: + maybeMeta = currMd.some + res = DeleteResult(kind: InUse) + else: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: NotFound) + + # making sure that the block acutally is removed from the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if hasBlock: + warn "Block metadata is absent, but block is present. Removing block.", cid + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + + (maybeMeta, res) + ) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim new file mode 100644 index 00000000..7d629131 --- /dev/null +++ b/codex/stores/repostore/store.nim @@ -0,0 +1,395 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/[cid, multicodec] +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ./operations +import ../blockstore +import ../keyutils +import ../queryiterhelper +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree +import ../../utils + +export blocktype, cid + +logScope: + topics = "codex repostore" + +########################################################### +# BlockStore API +########################################################### + +method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = + ## Get a block from the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return cid.emptyBlock + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + without data =? await self.repoDs.get(key), err: + if not (err of DatastoreKeyNotFound): + trace "Error getting block from datastore", err = err.msg, key + return failure(err) + + return failure(newException(BlockNotFoundError, err.msg)) + + trace "Got block for cid", cid + return Block.new(cid, data, verify = true) + +method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + without blk =? await self.getBlock(leafMd.blkCid), err: + return failure(err) + + success((blk, leafMd.proof)) + +method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.getBlock(leafMd.blkCid) + +method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = + ## Get a block from the blockstore + ## + + if address.leaf: + self.getBlock(address.treeCid, address.index) + else: + self.getBlock(address.cid) + +method ensureExpiry*( + self: RepoStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + if expiry <= 0: + return failure(newException(ValueError, "Expiry timestamp must be larger then zero")) + + await self.updateBlockMetadata(cid, minExpiry = expiry) + +method ensureExpiry*( + self: RepoStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.ensureExpiry(leafMd.blkCid, expiry) + +method putCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural, + blkCid: Cid, + proof: CodexProof +): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + treeCid = treeCid + index = index + blkCid = blkCid + + trace "Storing LeafMetadata" + + without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err: + return failure(err) + + if blkCid.mcodec == BlockCodec: + if res == Stored: + if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption: + return failure(err) + trace "Leaf metadata stored, block refCount incremented" + else: + trace "Leaf metadata already exists" + + return success() + +method getCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!(Cid, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success((leafMd.blkCid, leafMd.proof)) + +method getCid*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!Cid] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success(leafMd.blkCid) + +method putBlock*( + self: RepoStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + cid = blk.cid + + let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds + + without res =? await self.storeBlock(blk, expiry), err: + return failure(err) + + if res.kind == Stored: + trace "Block Stored" + if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: + # rollback changes + without delRes =? await self.tryDeleteBlock(blk.cid), err: + return failure(err) + return failure(err) + + if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: + return failure(err) + else: + trace "Block already exists" + + return success() + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + trace "Attempting to delete a block" + + without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: + return failure(err) + + if res.kind == Deleted: + trace "Block deleted" + if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: + return failure(err) + + if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + return failure(err) + elif res.kind == InUse: + trace "Block in use, refCount > 0 and not expired" + else: + trace "Block not found in store" + + return success() + +method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success() + else: + return failure(err) + + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: + if not (err of BlockNotFoundError): + return failure(err) + + await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + +method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return success true + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + return await self.repoDs.has(key) + +method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success(false) + else: + return failure(err) + + await self.hasBlock(leafMd.blkCid) + +method listBlocks*( + self: RepoStore, + blockType = BlockType.Manifest +): Future[?!AsyncIter[?Cid]] {.async.} = + ## Get the list of blocks in the RepoStore. + ## This is an intensive operation + ## + + var + iter = AsyncIter[?Cid]() + + let key = + case blockType: + of BlockType.Manifest: CodexManifestKey + of BlockType.Block: CodexBlocksKey + of BlockType.Both: CodexRepoKey + + let query = Query.init(key, value=false) + without queryIter =? (await self.repoDs.query(query)), err: + trace "Error querying cids in repo", blockType, err = err.msg + return failure(err) + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + if queryIter.finished: + iter.finish + else: + if pair =? (await queryIter.next()) and cid =? pair.key: + doAssert pair.data.len == 0 + trace "Retrieved record from repo", cid + return Cid.init(cid.value).option + else: + return Cid.none + + iter.next = next + return success iter + +proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = + let queryKey = ? createBlockExpirationMetadataQueryKey() + success Query.init(queryKey, offset = offset, limit = maxNumber) + +method getBlockExpirations*( + self: RepoStore, + maxNumber: int, + offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = + ## Get iterator with block expirations + ## + + without beQuery =? createBlockExpirationQuery(maxNumber, offset), err: + error "Unable to format block expirations query", err = err.msg + return failure(err) + + without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err: + error "Unable to execute block expirations query", err = err.msg + return failure(err) + + without asyncQueryIter =? await queryIter.toAsyncIter(), err: + error "Unable to convert QueryIter to AsyncIter", err = err.msg + return failure(err) + + let + filteredIter = await asyncQueryIter.filterSuccess() + blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, + proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} = + without cid =? Cid.init(kv.key.value).mapFailure, err: + error "Failed decoding cid", err = err.msg + return BlockExpiration.none + + BlockExpiration(cid: cid, expiry: kv.value.expiry).some + ) + + success(blockExpIter) + +method close*(self: RepoStore): Future[void] {.async.} = + ## Close the blockstore, cleaning up resources managed by it. + ## For some implementations this may be a no-op + ## + + trace "Closing repostore" + + if not self.metaDs.isNil: + (await self.metaDs.close()).expect("Should meta datastore") + + if not self.repoDs.isNil: + (await self.repoDs.close()).expect("Should repo datastore") + +########################################################### +# RepoStore procs +########################################################### + +proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Reserve bytes + ## + + trace "Reserving bytes", bytes + + await self.updateQuotaUsage(plusReserved = bytes) + +proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Release bytes + ## + + trace "Releasing bytes", bytes + + await self.updateQuotaUsage(minusReserved = bytes) + +proc start*(self: RepoStore): Future[void] {.async.} = + ## Start repo + ## + + if self.started: + trace "Repo already started" + return + + trace "Starting rep" + if err =? (await self.updateTotalBlocksCount()).errorOption: + raise newException(CodexError, err.msg) + + if err =? (await self.updateQuotaUsage()).errorOption: + raise newException(CodexError, err.msg) + + self.started = true + +proc stop*(self: RepoStore): Future[void] {.async.} = + ## Stop repo + ## + if not self.started: + trace "Repo is not started" + return + + trace "Stopping repo" + await self.close() + + self.started = false diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim new file mode 100644 index 00000000..4338e63a --- /dev/null +++ b/codex/stores/repostore/types.nim @@ -0,0 +1,107 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid + +import ../blockstore +import ../../clock +import ../../errors +import ../../merkletree +import ../../systemclock +import ../../units + +const + DefaultBlockTtl* = 24.hours + DefaultQuotaBytes* = 8.GiBs + +type + QuotaNotEnoughError* = object of CodexError + + RepoStore* = ref object of BlockStore + postFixLen*: int + repoDs*: Datastore + metaDs*: TypedDatastore + clock*: Clock + quotaMaxBytes*: NBytes + quotaUsage*: QuotaUsage + totalBlocks*: Natural + blockTtl*: Duration + started*: bool + + QuotaUsage* {.serialize.} = object + used*: NBytes + reserved*: NBytes + + BlockMetadata* {.serialize.} = object + expiry*: SecondsSince1970 + size*: NBytes + refCount*: Natural + + LeafMetadata* {.serialize.} = object + blkCid*: Cid + proof*: CodexProof + + BlockExpiration* {.serialize.} = object + cid*: Cid + expiry*: SecondsSince1970 + + DeleteResultKind* {.serialize.} = enum + Deleted = 0, # block removed from store + InUse = 1, # block not removed, refCount > 0 and not expired + NotFound = 2 # block not found in store + + DeleteResult* {.serialize.} = object + kind*: DeleteResultKind + released*: NBytes + + StoreResultKind* {.serialize.} = enum + Stored = 0, # new block stored + AlreadyInStore = 1 # block already in store + + StoreResult* {.serialize.} = object + kind*: StoreResultKind + used*: NBytes + +func quotaUsedBytes*(self: RepoStore): NBytes = + self.quotaUsage.used + +func quotaReservedBytes*(self: RepoStore): NBytes = + self.quotaUsage.reserved + +func totalUsed*(self: RepoStore): NBytes = + (self.quotaUsedBytes + self.quotaReservedBytes) + +func available*(self: RepoStore): NBytes = + return self.quotaMaxBytes - self.totalUsed + +func available*(self: RepoStore, bytes: NBytes): bool = + return bytes < self.available() + +func new*( + T: type RepoStore, + repoDs: Datastore, + metaDs: Datastore, + clock: Clock = SystemClock.new(), + postFixLen = 2, + quotaMaxBytes = DefaultQuotaBytes, + blockTtl = DefaultBlockTtl +): RepoStore = + ## Create new instance of a RepoStore + ## + RepoStore( + repoDs: repoDs, + metaDs: TypedDatastore.init(metaDs), + clock: clock, + postFixLen: postFixLen, + quotaMaxBytes: quotaMaxBytes, + blockTtl: blockTtl + ) diff --git a/codex/units.nim b/codex/units.nim index 63921d7d..52f44328 100644 --- a/codex/units.nim +++ b/codex/units.nim @@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes logutils.formatIt(NBytes): $it const - MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz + KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz + MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz + GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz +proc KiBs*(v: Natural): NBytes = v.NBytes * KiB proc MiBs*(v: Natural): NBytes = v.NBytes * MiB +proc GiBs*(v: Natural): NBytes = v.NBytes * GiB func divUp*[T: NBytes](a, b : T): int = ## Division with result rounded up (rather than truncated as in 'div') diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index fa49f878..86f881e0 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/sugar import pkg/chronos import pkg/libp2p import pkg/questionable @@ -24,33 +25,28 @@ type testBlockExpirations*: seq[BlockExpiration] getBlockExpirationsThrows*: bool - iteratorIndex: int method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = self.delBlockCids.add(cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) - dec self.iteratorIndex return success() -method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = +method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} = if self.getBlockExpirationsThrows: raise new CatchableError self.getBeMaxNumber = maxNumber self.getBeOffset = offset - var iter = AsyncIter[?BlockExpiration]() + let + testBlockExpirationsCpy = @(self.testBlockExpirations) + limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) - self.iteratorIndex = offset - var numberLeft = maxNumber - proc next(): Future[?BlockExpiration] {.async.} = - if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): - dec numberLeft - let selectedBlock = self.testBlockExpirations[self.iteratorIndex] - inc self.iteratorIndex - return selectedBlock.some - iter.finish - return BlockExpiration.none + let + iter1 = AsyncIter[int].new(offset.. orig check (updated.freeSize - orig) == 200.u256 - check (repo.quotaReservedBytes - origQuota) == 200 + check (repo.quotaReservedBytes - origQuota) == 200.NBytes test "update releases quota when lowering size": let @@ -220,7 +220,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize - 100 check isOk await reservations.update(availability) - check (origQuota - repo.quotaReservedBytes) == 100 + check (origQuota - repo.quotaReservedBytes) == 100.NBytes test "update reserves quota when growing size": let @@ -229,7 +229,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize + 100 check isOk await reservations.update(availability) - check (repo.quotaReservedBytes - origQuota) == 100 + check (repo.quotaReservedBytes - origQuota) == 100.NBytes test "reservation can be partially released": let availability = createAvailability() @@ -333,17 +333,17 @@ asyncchecksuite "Reservations module": check got.error of NotExistsError test "can get available bytes in repo": - check reservations.available == DefaultQuotaBytes + check reservations.available == DefaultQuotaBytes.uint test "reports quota available to be reserved": - check reservations.hasAvailable(DefaultQuotaBytes - 1) + check reservations.hasAvailable(DefaultQuotaBytes.uint - 1) test "reports quota not available to be reserved": - check not reservations.hasAvailable(DefaultQuotaBytes + 1) + check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1) test "fails to create availability with size that is larger than available quota": let created = await reservations.createAvailability( - (DefaultQuotaBytes + 1).u256, + (DefaultQuotaBytes.uint + 1).u256, UInt256.example, UInt256.example, UInt256.example diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index c3352cfa..543a7133 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -2,7 +2,7 @@ import std/sequtils import std/sugar import std/times import pkg/chronos -import pkg/datastore +import pkg/datastore/typedds import pkg/questionable import pkg/questionable/results import pkg/codex/sales diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim index c050f623..bdf48c12 100644 --- a/tests/codex/stores/testmaintenance.nim +++ b/tests/codex/stores/testmaintenance.nim @@ -34,10 +34,10 @@ checksuite "BlockMaintainer": var testBe2: BlockExpiration var testBe3: BlockExpiration - proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = + proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration = BlockExpiration( cid: bt.Block.example.cid, - expiration: expiration + expiry: expiry ) setup: @@ -186,4 +186,3 @@ checksuite "BlockMaintainer": await invokeTimerManyTimes() # Second new block has expired check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] - diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index e58eeb29..46a705e3 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -34,24 +34,24 @@ checksuite "Test RepoStore start/stop": metaDs = SQLiteDatastore.new(Memory).tryGet() test "Should set started flag once started": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() check repo.started test "Should set started flag to false once stopped": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.stop() check not repo.started test "Should allow start to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.start() check repo.started test "Should allow stop to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.stop() await repo.stop() check not repo.started @@ -73,7 +73,7 @@ asyncchecksuite "RepoStore": mockClock = MockClock.new() mockClock.set(now) - repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200) + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb) teardown: (await repoDs.close()).tryGet @@ -85,117 +85,107 @@ asyncchecksuite "RepoStore": test "Should update current used bytes on block put": let blk = createTestBlock(200) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet check: - repo.quotaUsedBytes == 200 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u + repo.quotaUsedBytes == 200'nb test "Should update current used bytes on block delete": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb (await repo.delBlock(blk.cid)).tryGet check: - repo.quotaUsedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u + repo.quotaUsedBytes == 0'nb test "Should not update current used bytes if block exist": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb # put again (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 - - check: - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + check repo.quotaUsedBytes == 100'nb test "Should fail storing passed the quota": let blk = createTestBlock(300) - check repo.totalUsed == 0 - expect QuotaUsedError: + check repo.totalUsed == 0'nb + expect QuotaNotEnoughError: (await repo.putBlock(blk)).tryGet test "Should reserve bytes": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb - (await repo.reserve(100)).tryGet + (await repo.reserve(100'nb)).tryGet check: - repo.totalUsed == 200 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 200'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 100'nb test "Should not reserve bytes over max quota": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb expect QuotaNotEnoughError: - (await repo.reserve(101)).tryGet + (await repo.reserve(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 0 - - expect DatastoreKeyNotFound: - discard (await metaDs.get(QuotaReservedKey)).tryGet + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 0'nb test "Should release bytes": discard createTestBlock(100) - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - (await repo.release(100)).tryGet + (await repo.release(100'nb)).tryGet check: - repo.totalUsed == 0 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u + repo.totalUsed == 0'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 0'nb test "Should not release bytes less than quota": - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - expect CatchableError: - (await repo.release(101)).tryGet + expect RangeDefect: + (await repo.release(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 100'nb - proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = - let - query = Query.init(key) - responseIter = (await metaDs.query(query)).tryGet - response = (await allFinished(toSeq(responseIter))) - .mapIt(it.read.tryGet) - .filterIt(it.key.isSome) - return response + proc getExpirations(): Future[seq[BlockExpiration]] {.async.} = + let iter = (await repo.getBlockExpirations(100, 0)).tryGet() + + var res = newSeq[BlockExpiration]() + for fut in iter: + let be = await fut + res.add(be) + + res test "Should store block expiration timestamp": let @@ -203,49 +193,40 @@ asyncchecksuite "RepoStore": blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + 10 - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, duration.some)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should store block with default expiration timestamp when not provided": let blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds) (await repo.putBlock(blk)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should refuse update expiry with negative timestamp": let blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations expect ValueError: (await repo.ensureExpiry(blk.cid, -1)).tryGet @@ -262,56 +243,45 @@ asyncchecksuite "RepoStore": test "Should update block expiration timestamp when new expiration is farther": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20) - (await repo.putBlock(blk, some duration.seconds)).tryGet + (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 20)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == updatedExpectedExpiration.toBytes + expectedExpiration notin updatedExpirations + updatedExpectedExpiration in updatedExpirations test "Should not update block expiration timestamp when current expiration is farther then new one": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5) + (await repo.putBlock(blk, some 10.seconds)).tryGet - (await repo.putBlock(blk, some duration.seconds)).tryGet - - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 5)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in updatedExpirations + updatedExpectedExpiration notin updatedExpirations test "delBlock should remove expiration metadata": let @@ -321,19 +291,19 @@ asyncchecksuite "RepoStore": (await repo.putBlock(blk, 10.seconds.some)).tryGet (await repo.delBlock(blk.cid)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 0 + expirations.len == 0 test "Should retrieve block expiration information": - proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = + proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: return expirations - for be in toSeq(iter): - if value =? (await be): - expirations.add(value) + for beFut in toSeq(iter): + let value = await beFut + expirations.add(value) return expirations let @@ -343,12 +313,12 @@ asyncchecksuite "RepoStore": blk3 = createTestBlock(12) let - expectedExpiration: SecondsSince1970 = 123 + 10 + expectedExpiration: SecondsSince1970 = now + 10 proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = check: be.cid == expectedBlock.cid - be.expiration == expectedExpiration + be.expiry == expectedExpiration (await repo.putBlock(blk1, duration.some)).tryGet diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 74363f60..8c2c20e4 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,5 +1,6 @@ import std/sequtils from pkg/libp2p import `==` +import pkg/codex/units import ./twonodes twonodessuite "REST API", debug1 = false, debug2 = false: @@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false: discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get let space = client1.space().tryGet() check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint + space.totalBlocks == 2 + space.quotaMaxBytes == 8589934592.NBytes + space.quotaUsedBytes == 65592.NBytes + space.quotaReservedBytes == 12.NBytes test "node lists local files": let content1 = "some file contents"