diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 936f568e..c3e67d9f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,7 @@ env: nim_version: v1.6.14 rust_version: 1.78.0 binary_base: codex + nim_flags: '-d:verify_circuit=true' upload_to_codex: false jobs: @@ -74,7 +75,7 @@ jobs: - name: Release - Build run: | - make NIMFLAGS="--out:${{ env.binary }}" + make NIMFLAGS="--out:${{ env.binary }} ${{ env.nim_flags }}" - name: Release - Upload binaries uses: actions/upload-artifact@v4 diff --git a/codex/codex.nim b/codex/codex.nim index fa9d0c13..0b9182fb 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -258,7 +258,7 @@ proc new*( repoDs = repoData, metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create metadata store!"), - quotaMaxBytes = config.storageQuota.uint, + quotaMaxBytes = config.storageQuota, blockTtl = config.blockTtl) maintenance = BlockMaintainer.new( diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index 27444522..d8eeba58 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -49,8 +49,8 @@ func getLinearIndicies( self.checkIteration(iteration) let - first = self.firstIndex + iteration * (self.step + 1) - last = min(first + self.step, self.lastIndex) + first = self.firstIndex + iteration * self.step + last = min(first + self.step - 1, self.lastIndex) getIter(first, last, 1) @@ -94,4 +94,4 @@ func init*( firstIndex: firstIndex, lastIndex: lastIndex, iterations: iterations, - step: divUp((lastIndex - firstIndex), iterations)) + step: divUp((lastIndex - firstIndex + 1), iterations)) diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 486a8fc3..abfd5b4c 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -135,13 +135,6 @@ func isManifest*(mc: MultiCodec): ?!bool = # Various sizes and verification ############################################################ -func bytes*(self: Manifest, pad = true): NBytes = - ## Compute how many bytes corresponding StoreStream(Manifest, pad) will return - if pad or self.protected: - self.blocksCount.NBytes * self.blockSize - else: - self.datasetSize - func rounded*(self: Manifest): int = ## Number of data blocks in *protected* manifest including padding at the end roundUp(self.originalBlocksCount, self.ecK) @@ -238,7 +231,7 @@ func new*( treeCid: Cid, datasetSize: NBytes, ecK, ecM: int, - strategy: StrategyType): Manifest = + strategy = SteppedStrategy): Manifest = ## Create an erasure protected dataset from an ## unprotected one ## @@ -284,7 +277,7 @@ func new*( ecM: int, originalTreeCid: Cid, originalDatasetSize: NBytes, - strategy: StrategyType): Manifest = + strategy = SteppedStrategy): Manifest = Manifest( treeCid: treeCid, @@ -306,7 +299,7 @@ func new*( verifyRoot: Cid, slotRoots: openArray[Cid], cellSize = DefaultCellSize, - strategy = SteppedStrategy): ?!Manifest = + strategy = LinearStrategy): ?!Manifest = ## Create a verifiable dataset from an ## protected one ## @@ -331,6 +324,7 @@ func new*( ecM: manifest.ecM, originalTreeCid: manifest.treeCid, originalDatasetSize: manifest.originalDatasetSize, + protectedStrategy: manifest.protectedStrategy, verifiable: true, verifyRoot: verifyRoot, slotRoots: @slotRoots, diff --git a/codex/merkletree/codex/coders.nim b/codex/merkletree/codex/coders.nim index 62e4f75b..a2d5a24b 100644 --- a/codex/merkletree/codex/coders.nim +++ b/codex/merkletree/codex/coders.nim @@ -14,6 +14,8 @@ push: {.upraises: [].} import pkg/libp2p import pkg/questionable import pkg/questionable/results +import pkg/stew/byteutils +import pkg/serde/json import ../../units import ../../errors @@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof = nodes.add node CodexProof.init(mcodec, index.int, nleaves.int, nodes) + +proc fromJson*( + _: type CodexProof, + json: JsonNode +): ?!CodexProof = + expectJsonKind(Cid, JString, json) + var bytes: seq[byte] + try: + bytes = hexToSeqByte(json.str) + except ValueError as err: + return failure(err) + + CodexProof.decode(bytes) + +func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode()) diff --git a/codex/node.nim b/codex/node.nim index dd76a717..15ca1466 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -240,14 +240,14 @@ proc streamEntireDataset( self: CodexNodeRef, manifest: Manifest, manifestCid: Cid, -): ?!LPStream = +): Future[?!LPStream] {.async.} = ## Streams the contents of the entire dataset described by the manifest. ## trace "Retrieving blocks from manifest", manifestCid if manifest.protected: # Retrieve, decode and save to the local store all EС groups - proc erasureJob(): Future[void] {.async.} = + proc erasureJob(): Future[?!void] {.async.} = try: # Spawn an erasure decoding job let @@ -258,10 +258,16 @@ proc streamEntireDataset( self.taskpool) without _ =? (await erasure.decode(manifest)), error: trace "Unable to erasure decode manifest", manifestCid, exc = error.msg - except CatchableError as exc: + # -------------------------------------------------------------------------- + # FIXME this is a HACK so that the node does not crash during the workshop. + # We should NOT catch Defect. + except Exception as exc: trace "Exception decoding manifest", manifestCid, exc = exc.msg + return failure(exc.msg) + # -------------------------------------------------------------------------- - asyncSpawn erasureJob() + if err =? (await erasureJob()).errorOption: + return failure(err) # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid @@ -283,7 +289,7 @@ proc retrieve*( return await self.streamSingleBlock(cid) - self.streamEntireDataset(manifest, cid) + await self.streamEntireDataset(manifest, cid) proc store*( self: CodexNodeRef, @@ -534,7 +540,9 @@ proc onStore( trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) - without builder =? Poseidon2Builder.new(self.networkStore, manifest), err: + without builder =? Poseidon2Builder.new( + self.networkStore, manifest, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -559,8 +567,8 @@ proc onStore( return success() - without indexer =? manifest.protectedStrategy.init( - 0, manifest.numSlotBlocks() - 1, manifest.numSlots).catch, err: + without indexer =? manifest.verifiableStrategy.init( + 0, manifest.blocksCount - 1, manifest.numSlots).catch, err: trace "Unable to create indexing strategy from protected manifest", err = err.msg return failure(err) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 142af9d2..fba708be 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -7,6 +7,7 @@ import ../sales import ../purchasing import ../utils/json import ../manifest +import ../units export json @@ -65,10 +66,10 @@ type id*: NodeId RestRepoStore* = object - totalBlocks* {.serialize.}: uint - quotaMaxBytes* {.serialize.}: uint - quotaUsedBytes* {.serialize.}: uint - quotaReservedBytes* {.serialize.}: uint + totalBlocks* {.serialize.}: Natural + quotaMaxBytes* {.serialize.}: NBytes + quotaUsedBytes* {.serialize.}: NBytes + quotaReservedBytes* {.serialize.}: NBytes proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList = RestContentList( diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 8ba762ea..0b5eaaf5 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -46,6 +46,7 @@ import ../stores import ../market import ../contracts/requests import ../utils/json +import ../units export requests export logutils @@ -53,6 +54,7 @@ export logutils logScope: topics = "sales reservations" + type AvailabilityId* = distinct array[32, byte] ReservationId* = distinct array[32, byte] @@ -71,7 +73,8 @@ type size* {.serialize.}: UInt256 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: UInt256 - Reservations* = ref object + Reservations* = ref object of RootObj + availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} @@ -95,12 +98,22 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc hash*(x: AvailabilityId): Hash {.borrow.} proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} +template withLock(lock, body) = + try: + await lock.acquire() + body + finally: + if lock.locked: + lock.release() + + proc new*(T: type Reservations, repo: RepoStore): Reservations = - T(repo: repo) + T(availabilityLock: newAsyncLock(),repo: repo) proc init*( _: type Availability, @@ -166,16 +179,16 @@ func key*(availability: Availability): ?!Key = func key*(reservation: Reservation): ?!Key = return key(reservation.id, reservation.availabilityId) -func available*(self: Reservations): uint = self.repo.available +func available*(self: Reservations): uint = self.repo.available.uint func hasAvailable*(self: Reservations, bytes: uint): bool = - self.repo.available(bytes) + self.repo.available(bytes.NBytes) proc exists*( self: Reservations, key: Key): Future[bool] {.async.} = - let exists = await self.repo.metaDs.contains(key) + let exists = await self.repo.metaDs.ds.contains(key) return exists proc getImpl( @@ -186,7 +199,7 @@ proc getImpl( let err = newException(NotExistsError, "object with key " & $key & " does not exist") return failure(err) - without serialized =? await self.repo.metaDs.get(key), error: + without serialized =? await self.repo.metaDs.ds.get(key), error: return failure(error.toErr(GetFailedError)) return success serialized @@ -213,7 +226,7 @@ proc updateImpl( without key =? obj.key, error: return failure(error) - if err =? (await self.repo.metaDs.put( + if err =? (await self.repo.metaDs.ds.put( key, @(obj.toJson.toBytes) )).errorOption: @@ -221,20 +234,19 @@ proc updateImpl( return success() -proc update*( - self: Reservations, - obj: Reservation): Future[?!void] {.async.} = - return await self.updateImpl(obj) - -proc update*( +proc updateAvailability( self: Reservations, obj: Availability): Future[?!void] {.async.} = + logScope: + availabilityId = obj.id + without key =? obj.key, error: return failure(error) without oldAvailability =? await self.get(key, Availability), err: if err of NotExistsError: + trace "Creating new Availability" let res = await self.updateImpl(obj) # inform subscribers that Availability has been added if onAvailabilityAdded =? self.onAvailabilityAdded: @@ -248,20 +260,20 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res else: return failure(err) # Sizing of the availability changed, we need to adjust the repo reservation accordingly if oldAvailability.totalSize != obj.totalSize: + trace "totalSize changed, updating repo reservation" if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReleaseFailedError)) let res = await self.updateImpl(obj) @@ -280,11 +292,21 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res +proc update*( + self: Reservations, + obj: Reservation): Future[?!void] {.async.} = + return await self.updateImpl(obj) + +proc update*( + self: Reservations, + obj: Availability): Future[?!void] {.async.} = + withLock(self.availabilityLock): + return await self.updateAvailability(obj) + proc delete( self: Reservations, key: Key): Future[?!void] {.async.} = @@ -294,7 +316,7 @@ proc delete( if not await self.exists(key): return success() - if err =? (await self.repo.metaDs.delete(key)).errorOption: + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: return failure(err.toErr(DeleteFailedError)) return success() @@ -312,31 +334,32 @@ proc deleteReservation*( without key =? key(reservationId, availabilityId), error: return failure(error) - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) - if reservation.size > 0.u256: - trace "returning remaining reservation bytes to availability", - size = reservation.size + if reservation.size > 0.u256: + trace "returning remaining reservation bytes to availability", + size = reservation.size - without availabilityKey =? availabilityId.key, error: - return failure(error) + without availabilityKey =? availabilityId.key, error: + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - availability.freeSize += reservation.size + availability.freeSize += reservation.size - if updateErr =? (await self.update(availability)).errorOption: - return failure(updateErr) + if updateErr =? (await self.updateAvailability(availability)).errorOption: + return failure(updateErr) - if err =? (await self.repo.metaDs.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -355,14 +378,14 @@ proc createAvailability*( ) let bytes = availability.freeSize.truncate(uint) - if reserveErr =? (await self.repo.reserve(bytes)).errorOption: + if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) if updateErr =? (await self.update(availability)).errorOption: # rollback the reserve trace "rolling back reserve" - if rollbackErr =? (await self.repo.release(bytes)).errorOption: + if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption: rollbackErr.parent = updateErr return failure(rollbackErr) @@ -370,54 +393,57 @@ proc createAvailability*( return success(availability) -proc createReservation*( +method createReservation*( self: Reservations, availabilityId: AvailabilityId, slotSize: UInt256, requestId: RequestId, slotIndex: UInt256 -): Future[?!Reservation] {.async.} = +): Future[?!Reservation] {.async, base.} = - trace "creating reservation", availabilityId, slotSize, requestId, slotIndex + withLock(self.availabilityLock): + without availabilityKey =? availabilityId.key, error: + return failure(error) - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + without availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - without availabilityKey =? availabilityId.key, error: - return failure(error) + # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications + if availability.freeSize < slotSize: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the total size of the Availability") + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex - if availability.freeSize < slotSize: - let error = newException( - BytesOutOfBoundsError, - "trying to reserve an amount of bytes that is greater than the total size of the Availability") - return failure(error) + let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) - if createResErr =? (await self.update(reservation)).errorOption: - return failure(createResErr) + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) - # reduce availability freeSize by the slot size, which is now accounted for in - # the newly created Reservation - availability.freeSize -= slotSize + # reduce availability freeSize by the slot size, which is now accounted for in + # the newly created Reservation + availability.freeSize -= slotSize - # update availability with reduced size - if updateErr =? (await self.update(availability)).errorOption: + # update availability with reduced size + trace "Updating availability with reduced size" + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Updating availability failed, rolling back reservation creation" - trace "rolling back reservation creation" + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) - without key =? reservation.key, keyError: - keyError.parent = updateErr - return failure(keyError) + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) - # rollback the reservation creation - if rollbackErr =? (await self.delete(key)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) + return failure(updateErr) - return failure(updateErr) - - return success(reservation) + trace "Reservation succesfully created" + return success(reservation) proc returnBytesToAvailability*( self: Reservations, @@ -429,48 +455,48 @@ proc returnBytesToAvailability*( reservationId availabilityId + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() + + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) - if bytesToBeReturned == 0: - trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned return success() - trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.update(availability)).errorOption: - - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() - proc release*( self: Reservations, reservationId: ReservationId, @@ -497,7 +523,7 @@ proc release*( "trying to release an amount of bytes that is greater than the total size of the Reservation") return failure(error) - if releaseErr =? (await self.repo.release(bytes)).errorOption: + if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption: return failure(releaseErr.toErr(ReleaseFailedError)) reservation.size -= bytes.u256 @@ -507,7 +533,7 @@ proc release*( # rollback release if an update error encountered trace "rolling back release" - if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: + if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: rollbackErr.parent = err return failure(rollbackErr) return failure(err) @@ -537,7 +563,7 @@ proc storables( else: raiseAssert "unknown type" - without results =? await self.repo.metaDs.query(query), error: + without results =? await self.repo.metaDs.ds.query(query), error: return failure(error) # /sales/reservations @@ -621,6 +647,7 @@ proc findAvailability*( minPrice >= availability.minPrice: trace "availability matched", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, @@ -635,8 +662,8 @@ proc findAvailability*( return some availability trace "availability did not match", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral - diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index 973446e2..e5a441d3 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -1,5 +1,6 @@ import pkg/questionable import pkg/questionable/results +import pkg/metrics import ../../logutils import ../../market @@ -13,6 +14,8 @@ import ./ignored import ./downloading import ./errored +declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch") + type SalePreparing* = ref object of ErrorHandlingState @@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = request.id, data.slotIndex ), error: + trace "Creation of reservation failed" + # Race condition: + # reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it. + # Should createReservation fail because there's no space, we proceed to SaleIgnored. + if error of BytesOutOfBoundsError: + # Lets monitor how often this happen and if it is often we can make it more inteligent to handle it + codex_reservations_availability_mismatch.inc() + return some State(SaleIgnored()) + return some State(SaleErrored(error: error)) + trace "Reservation created succesfully" + data.reservation = some reservation return some State(SaleDownloading()) diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 76193a53..63c6ba40 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} trace "Unable to delete block from repoStore" proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = - if be.expiration < self.clock.now: + if be.expiry < self.clock.now: await self.deleteExpiredBlock(be.cid) else: inc self.offset @@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = return var numberReceived = 0 - for maybeBeFuture in iter: - if be =? await maybeBeFuture: - inc numberReceived - await self.processBlockExpiration(be) - await sleepAsync(50.millis) + for beFut in iter: + let be = await beFut + inc numberReceived + await self.processBlockExpiration(be) + await sleepAsync(1.millis) # cooperative scheduling # If we received fewer blockExpirations from the iterator than we asked for, # We're at the end of the dataset and should start from 0 next time. diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 357e9401..7c51d215 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/datastore/typedds import ../utils/asynciter -type KeyVal[T] = tuple[key: Key, value: T] +type KeyVal*[T] = tuple[key: Key, value: T] proc toAsyncIter*[T]( queryIter: QueryIter[T], diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index c129710f..5937cbfc 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -1,679 +1,5 @@ -## Nim-Codex -## Copyright (c) 2022 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. +import ./repostore/store +import ./repostore/types +import ./repostore/coders -import pkg/upraises - -push: {.upraises: [].} - -import pkg/chronos -import pkg/chronos/futures -import pkg/libp2p/[cid, multicodec, multihash] -import pkg/lrucache -import pkg/metrics -import pkg/questionable -import pkg/questionable/results -import pkg/datastore -import pkg/stew/endians2 - -import ./blockstore -import ./keyutils -import ../blocktype -import ../clock -import ../systemclock -import ../logutils -import ../merkletree -import ../utils - -export blocktype, cid - -logScope: - topics = "codex repostore" - -declareGauge(codex_repostore_blocks, "codex repostore blocks") -declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") -declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") - -const - DefaultBlockTtl* = 24.hours - DefaultQuotaBytes* = 1'u shl 33'u # ~8GB - -type - QuotaUsedError* = object of CodexError - QuotaNotEnoughError* = object of CodexError - - RepoStore* = ref object of BlockStore - postFixLen*: int - repoDs*: Datastore - metaDs*: Datastore - clock: Clock - totalBlocks*: uint # number of blocks in the store - quotaMaxBytes*: uint # maximum available bytes - quotaUsedBytes*: uint # bytes used by the repo - quotaReservedBytes*: uint # bytes reserved by the repo - blockTtl*: Duration - started*: bool - - BlockExpiration* = object - cid*: Cid - expiration*: SecondsSince1970 - -proc updateMetrics(self: RepoStore) = - codex_repostore_blocks.set(self.totalBlocks.int64) - codex_repostore_bytes_used.set(self.quotaUsedBytes.int64) - codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64) - -func totalUsed*(self: RepoStore): uint = - (self.quotaUsedBytes + self.quotaReservedBytes) - -func available*(self: RepoStore): uint = - return self.quotaMaxBytes - self.totalUsed - -func available*(self: RepoStore, bytes: uint): bool = - return bytes < self.available() - -proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] = - ## Encodes a tuple of cid and merkle proof in a following format: - ## | 8-bytes | n-bytes | remaining bytes | - ## | n | cid | proof | - ## - ## where n is a size of cid - ## - let - (cid, proof) = cidAndProof - cidBytes = cid.data.buffer - proofBytes = proof.encode - n = cidBytes.len - nBytes = n.uint64.toBytesBE - - @nBytes & cidBytes & proofBytes - -proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) = - let - n = uint64.fromBytesBE(data[0.. self.quotaMaxBytes: - error "Cannot store block, quota used!", used = self.totalUsed - return failure( - newException(QuotaUsedError, "Cannot store block, quota used!")) - - var - batch: seq[BatchEntry] - - let - used = self.quotaUsedBytes + blk.data.len.uint - - if err =? (await self.repoDs.put(key, blk.data)).errorOption: - error "Error storing block", err = err.msg - return failure(err) - - batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) - - without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: - warn "Unable to create block expiration metadata key", err = err.msg - return failure(err) - batch.add(blockExpEntry) - - if err =? (await self.metaDs.put(batch)).errorOption: - error "Error updating quota bytes", err = err.msg - - if err =? (await self.repoDs.delete(key)).errorOption: - error "Error deleting block after failed quota update", err = err.msg - return failure(err) - - return failure(err) - - self.quotaUsedBytes = used - inc self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - warn "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = - let used = self.quotaUsedBytes - blk.data.len.uint - if err =? (await self.metaDs.put( - QuotaUsedKey, - @(used.uint64.toBytesBE))).errorOption: - trace "Error updating quota key!", err = err.msg - return failure(err) - self.quotaUsedBytes = used - self.updateMetrics() - return success() - -proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - without key =? createBlockExpirationMetadataKey(cid), err: - return failure(err) - return await self.metaDs.delete(key) - -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore - ## - - logScope: - cid = cid - - trace "Deleting block" - - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - if blk =? (await self.getBlock(cid)): - if key =? makePrefixKey(self.postFixLen, cid) and - err =? (await self.repoDs.delete(key)).errorOption: - trace "Error deleting block!", err = err.msg - return failure(err) - - if isErr (await self.updateQuotaBytesUsed(blk)): - trace "Unable to update quote-bytes-used in metadata store" - return failure("Unable to update quote-bytes-used in metadata store") - - if isErr (await self.removeBlockExpirationEntry(blk.cid)): - trace "Unable to remove block expiration entry from metadata store" - return failure("Unable to remove block expiration entry from metadata store") - - trace "Deleted block", cid, totalUsed = self.totalUsed - - dec self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - trace "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = - without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: - return failure(err) - - trace "Fetching proof", key - without value =? await self.metaDs.get(key), err: - if err of DatastoreKeyNotFound: - return success() - else: - return failure(err) - - without cid =? (Cid, CodexProof).decodeCid(value), err: - return failure(err) - - trace "Deleting block", cid - if err =? (await self.delBlock(cid)).errorOption: - return failure(err) - - await self.metaDs.delete(key) - -method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore - ## - - logScope: - cid = cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success true - - without key =? makePrefixKey(self.postFixLen, cid), err: - trace "Error getting key from provider", err = err.msg - return failure(err) - - return await self.repoDs.has(key) - -method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = - without cid =? await self.getCid(treeCid, index), err: - if err of BlockNotFoundError: - return success(false) - else: - return failure(err) - - await self.hasBlock(cid) - -method listBlocks*( - self: RepoStore, - blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.async.} = - ## Get the list of blocks in the RepoStore. - ## This is an intensive operation - ## - - var - iter = AsyncIter[?Cid]() - - let key = - case blockType: - of BlockType.Manifest: CodexManifestKey - of BlockType.Block: CodexBlocksKey - of BlockType.Both: CodexRepoKey - - let query = Query.init(key, value=false) - without queryIter =? (await self.repoDs.query(query)), err: - trace "Error querying cids in repo", blockType, err = err.msg - return failure(err) - - proc next(): Future[?Cid] {.async.} = - await idleAsync() - if queryIter.finished: - iter.finish - else: - if pair =? (await queryIter.next()) and cid =? pair.key: - doAssert pair.data.len == 0 - trace "Retrieved record from repo", cid - return Cid.init(cid.value).option - else: - return Cid.none - - iter.next = next - return success iter - -proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = - let queryKey = ? createBlockExpirationMetadataQueryKey() - success Query.init(queryKey, offset = offset, limit = maxNumber) - -method getBlockExpirations*( - self: RepoStore, - maxNumber: int, - offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} = - ## Get block expirations from the given RepoStore - ## - - without query =? createBlockExpirationQuery(maxNumber, offset), err: - trace "Unable to format block expirations query" - return failure(err) - - without queryIter =? (await self.metaDs.query(query)), err: - trace "Unable to execute block expirations query" - return failure(err) - - var iter = AsyncIter[?BlockExpiration]() - - proc next(): Future[?BlockExpiration] {.async.} = - if not queryIter.finished: - if pair =? (await queryIter.next()) and blockKey =? pair.key: - let expirationTimestamp = pair.data - let cidResult = Cid.init(blockKey.value) - if not cidResult.isOk: - raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error) - return BlockExpiration( - cid: cidResult.get, - expiration: expirationTimestamp.toSecondsSince1970 - ).some - else: - discard await queryIter.dispose() - iter.finish - return BlockExpiration.none - - iter.next = next - return success iter - -method close*(self: RepoStore): Future[void] {.async.} = - ## Close the blockstore, cleaning up resources managed by it. - ## For some implementations this may be a no-op - ## - - trace "Closing repostore" - - if not self.metaDs.isNil: - (await self.metaDs.close()).expect("Should meta datastore") - - if not self.repoDs.isNil: - (await self.repoDs.close()).expect("Should repo datastore") - -proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Reserve bytes - ## - - trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes - - if (self.totalUsed + bytes) > self.quotaMaxBytes: - trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes - return failure( - newException(QuotaNotEnoughError, "Not enough storage quota to reserver")) - - self.quotaReservedBytes += bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error reserving bytes", err = err.msg - - self.quotaReservedBytes += bytes - return failure(err) - - return success() - -proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Release bytes - ## - - trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes - - if (self.quotaReservedBytes.int - bytes.int) < 0: - trace "Cannot release this many bytes", - quotaReservedBytes = self.quotaReservedBytes, bytes - - return failure("Cannot release this many bytes") - - self.quotaReservedBytes -= bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error releasing bytes", err = err.msg - - self.quotaReservedBytes -= bytes - - return failure(err) - - trace "Released bytes", bytes - self.updateMetrics() - return success() - -proc start*(self: RepoStore): Future[void] {.async.} = - ## Start repo - ## - - if self.started: - trace "Repo already started" - return - - trace "Starting repo" - - without total =? await self.metaDs.get(CodexTotalBlocksKey), err: - if not (err of DatastoreKeyNotFound): - error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey - - if total.len > 0: - self.totalBlocks = uint64.fromBytesBE(total).uint - trace "Number of blocks in store at start", total = self.totalBlocks - - ## load current persist and cache bytes from meta ds - without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting cache bytes from datastore", - err = err.msg, key = $QuotaUsedKey - - raise newException(Defect, err.msg) - - if quotaUsedBytes.len > 0: - self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint - - notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes - - without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting persist bytes from datastore", - err = err.msg, key = $QuotaReservedKey - - raise newException(Defect, err.msg) - - if quotaReservedBytes.len > 0: - self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint - - if self.quotaUsedBytes > self.quotaMaxBytes: - raiseAssert "All storage quota used, increase storage quota!" - - notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes - - self.updateMetrics() - self.started = true - -proc stop*(self: RepoStore): Future[void] {.async.} = - ## Stop repo - ## - if not self.started: - trace "Repo is not started" - return - - trace "Stopping repo" - await self.close() - - self.started = false - -func new*( - T: type RepoStore, - repoDs: Datastore, - metaDs: Datastore, - clock: Clock = SystemClock.new(), - postFixLen = 2, - quotaMaxBytes = DefaultQuotaBytes, - blockTtl = DefaultBlockTtl -): RepoStore = - ## Create new instance of a RepoStore - ## - RepoStore( - repoDs: repoDs, - metaDs: metaDs, - clock: clock, - postFixLen: postFixLen, - quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl - ) +export store, types, coders diff --git a/codex/stores/repostore/coders.nim b/codex/stores/repostore/coders.nim new file mode 100644 index 00000000..6fc78408 --- /dev/null +++ b/codex/stores/repostore/coders.nim @@ -0,0 +1,47 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. +## + +import std/sugar +import pkg/libp2p/cid +import pkg/serde/json +import pkg/stew/byteutils +import pkg/stew/endians2 + +import ./types +import ../../errors +import ../../merkletree +import ../../utils/json + +proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes() +proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(i: uint64): seq[byte] = + @(i.toBytesBE) + +proc decode*(T: type uint64, bytes: seq[byte]): ?!T = + if bytes.len >= sizeof(uint64): + success(uint64.fromBytesBE(bytes)) + else: + failure("Not enough bytes to decode `uint64`") + +proc encode*(i: Natural | enum): seq[byte] = cast[uint64](i).encode +proc decode*(T: typedesc[Natural | enum], bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[T](ui)) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim new file mode 100644 index 00000000..e000bb0a --- /dev/null +++ b/codex/stores/repostore/operations.nim @@ -0,0 +1,213 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid +import pkg/metrics +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ../blockstore +import ../keyutils +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree + +logScope: + topics = "codex repostore" + +declareGauge(codex_repostore_blocks, "codex repostore blocks") +declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") +declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") + +proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + await self.metaDs.modifyGet(key, + proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} = + var + md: LeafMetadata + res: StoreResultKind + + if currMd =? maybeCurrMd: + md = currMd + res = AlreadyInStore + else: + md = LeafMetadata(blkCid: blkCid, proof: proof) + res = Stored + + (md.some, res) + ) + +proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + without leafMd =? await get[LeafMetadata](self.metaDs, key), err: + if err of DatastoreKeyNotFound: + return failure(newException(BlockNotFoundError, err.msg)) + else: + return failure(err) + + success(leafMd) + +proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} = + await self.metaDs.modify(CodexTotalBlocksKey, + proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} = + let count: Natural = + if currCount =? maybeCurrCount: + currCount + plusCount - minusCount + else: + plusCount - minusCount + + self.totalBlocks = count + codex_repostore_blocks.set(count.int64) + count.some + ) + +proc updateQuotaUsage*( + self: RepoStore, + plusUsed: NBytes = 0.NBytes, + minusUsed: NBytes = 0.NBytes, + plusReserved: NBytes = 0.NBytes, + minusReserved: NBytes = 0.NBytes +): Future[?!void] {.async.} = + await self.metaDs.modify(QuotaUsedKey, + proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = + var usage: QuotaUsage + + if currUsage =? maybeCurrUsage: + usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved) + else: + usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + + if usage.used + usage.reserved > self.quotaMaxBytes: + raise newException(QuotaNotEnoughError, + "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & + $usage.reserved & ", limit: " & $self.quotaMaxBytes) + else: + self.quotaUsage = usage + codex_repostore_bytes_used.set(usage.used.int64) + codex_repostore_bytes_reserved.set(usage.reserved.int64) + return usage.some + ) + +proc updateBlockMetadata*( + self: RepoStore, + cid: Cid, + plusRefCount: Natural = 0, + minusRefCount: Natural = 0, + minExpiry: SecondsSince1970 = 0 +): Future[?!void] {.async.} = + if cid.isEmpty: + return success() + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + await self.metaDs.modify(metaKey, + proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} = + if currBlockMd =? maybeCurrBlockMd: + BlockMetadata( + size: currBlockMd.size, + expiry: max(currBlockMd.expiry, minExpiry), + refCount: currBlockMd.refCount + plusRefCount - minusRefCount + ).some + else: + raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found") + ) + +proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} = + if blk.isEmpty: + return success(StoreResult(kind: AlreadyInStore)) + + without metaKey =? createBlockExpirationMetadataKey(blk.cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} = + var + md: BlockMetadata + res: StoreResult + + if currMd =? maybeCurrMd: + if currMd.size == blk.data.len.NBytes: + md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount) + res = StoreResult(kind: AlreadyInStore) + + # making sure that the block acutally is stored in the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if not hasBlock: + warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + else: + raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid) + else: + md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0) + res = StoreResult(kind: Stored, used: blk.data.len.NBytes) + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + + (md.some, res) + ) + +proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} = + if cid.isEmpty: + return success(DeleteResult(kind: InUse)) + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} = + var + maybeMeta: ?BlockMetadata + res: DeleteResult + + if currMd =? maybeCurrMd: + if currMd.refCount == 0 or currMd.expiry < expiryLimit: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: Deleted, released: currMd.size) + + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + else: + maybeMeta = currMd.some + res = DeleteResult(kind: InUse) + else: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: NotFound) + + # making sure that the block acutally is removed from the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if hasBlock: + warn "Block metadata is absent, but block is present. Removing block.", cid + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + + (maybeMeta, res) + ) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim new file mode 100644 index 00000000..7d629131 --- /dev/null +++ b/codex/stores/repostore/store.nim @@ -0,0 +1,395 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/[cid, multicodec] +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ./operations +import ../blockstore +import ../keyutils +import ../queryiterhelper +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree +import ../../utils + +export blocktype, cid + +logScope: + topics = "codex repostore" + +########################################################### +# BlockStore API +########################################################### + +method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = + ## Get a block from the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return cid.emptyBlock + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + without data =? await self.repoDs.get(key), err: + if not (err of DatastoreKeyNotFound): + trace "Error getting block from datastore", err = err.msg, key + return failure(err) + + return failure(newException(BlockNotFoundError, err.msg)) + + trace "Got block for cid", cid + return Block.new(cid, data, verify = true) + +method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + without blk =? await self.getBlock(leafMd.blkCid), err: + return failure(err) + + success((blk, leafMd.proof)) + +method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.getBlock(leafMd.blkCid) + +method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = + ## Get a block from the blockstore + ## + + if address.leaf: + self.getBlock(address.treeCid, address.index) + else: + self.getBlock(address.cid) + +method ensureExpiry*( + self: RepoStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + if expiry <= 0: + return failure(newException(ValueError, "Expiry timestamp must be larger then zero")) + + await self.updateBlockMetadata(cid, minExpiry = expiry) + +method ensureExpiry*( + self: RepoStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.ensureExpiry(leafMd.blkCid, expiry) + +method putCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural, + blkCid: Cid, + proof: CodexProof +): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + treeCid = treeCid + index = index + blkCid = blkCid + + trace "Storing LeafMetadata" + + without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err: + return failure(err) + + if blkCid.mcodec == BlockCodec: + if res == Stored: + if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption: + return failure(err) + trace "Leaf metadata stored, block refCount incremented" + else: + trace "Leaf metadata already exists" + + return success() + +method getCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!(Cid, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success((leafMd.blkCid, leafMd.proof)) + +method getCid*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!Cid] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success(leafMd.blkCid) + +method putBlock*( + self: RepoStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + cid = blk.cid + + let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds + + without res =? await self.storeBlock(blk, expiry), err: + return failure(err) + + if res.kind == Stored: + trace "Block Stored" + if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: + # rollback changes + without delRes =? await self.tryDeleteBlock(blk.cid), err: + return failure(err) + return failure(err) + + if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: + return failure(err) + else: + trace "Block already exists" + + return success() + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + trace "Attempting to delete a block" + + without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: + return failure(err) + + if res.kind == Deleted: + trace "Block deleted" + if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: + return failure(err) + + if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + return failure(err) + elif res.kind == InUse: + trace "Block in use, refCount > 0 and not expired" + else: + trace "Block not found in store" + + return success() + +method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success() + else: + return failure(err) + + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: + if not (err of BlockNotFoundError): + return failure(err) + + await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + +method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return success true + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + return await self.repoDs.has(key) + +method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success(false) + else: + return failure(err) + + await self.hasBlock(leafMd.blkCid) + +method listBlocks*( + self: RepoStore, + blockType = BlockType.Manifest +): Future[?!AsyncIter[?Cid]] {.async.} = + ## Get the list of blocks in the RepoStore. + ## This is an intensive operation + ## + + var + iter = AsyncIter[?Cid]() + + let key = + case blockType: + of BlockType.Manifest: CodexManifestKey + of BlockType.Block: CodexBlocksKey + of BlockType.Both: CodexRepoKey + + let query = Query.init(key, value=false) + without queryIter =? (await self.repoDs.query(query)), err: + trace "Error querying cids in repo", blockType, err = err.msg + return failure(err) + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + if queryIter.finished: + iter.finish + else: + if pair =? (await queryIter.next()) and cid =? pair.key: + doAssert pair.data.len == 0 + trace "Retrieved record from repo", cid + return Cid.init(cid.value).option + else: + return Cid.none + + iter.next = next + return success iter + +proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = + let queryKey = ? createBlockExpirationMetadataQueryKey() + success Query.init(queryKey, offset = offset, limit = maxNumber) + +method getBlockExpirations*( + self: RepoStore, + maxNumber: int, + offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = + ## Get iterator with block expirations + ## + + without beQuery =? createBlockExpirationQuery(maxNumber, offset), err: + error "Unable to format block expirations query", err = err.msg + return failure(err) + + without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err: + error "Unable to execute block expirations query", err = err.msg + return failure(err) + + without asyncQueryIter =? await queryIter.toAsyncIter(), err: + error "Unable to convert QueryIter to AsyncIter", err = err.msg + return failure(err) + + let + filteredIter = await asyncQueryIter.filterSuccess() + blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, + proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} = + without cid =? Cid.init(kv.key.value).mapFailure, err: + error "Failed decoding cid", err = err.msg + return BlockExpiration.none + + BlockExpiration(cid: cid, expiry: kv.value.expiry).some + ) + + success(blockExpIter) + +method close*(self: RepoStore): Future[void] {.async.} = + ## Close the blockstore, cleaning up resources managed by it. + ## For some implementations this may be a no-op + ## + + trace "Closing repostore" + + if not self.metaDs.isNil: + (await self.metaDs.close()).expect("Should meta datastore") + + if not self.repoDs.isNil: + (await self.repoDs.close()).expect("Should repo datastore") + +########################################################### +# RepoStore procs +########################################################### + +proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Reserve bytes + ## + + trace "Reserving bytes", bytes + + await self.updateQuotaUsage(plusReserved = bytes) + +proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Release bytes + ## + + trace "Releasing bytes", bytes + + await self.updateQuotaUsage(minusReserved = bytes) + +proc start*(self: RepoStore): Future[void] {.async.} = + ## Start repo + ## + + if self.started: + trace "Repo already started" + return + + trace "Starting rep" + if err =? (await self.updateTotalBlocksCount()).errorOption: + raise newException(CodexError, err.msg) + + if err =? (await self.updateQuotaUsage()).errorOption: + raise newException(CodexError, err.msg) + + self.started = true + +proc stop*(self: RepoStore): Future[void] {.async.} = + ## Stop repo + ## + if not self.started: + trace "Repo is not started" + return + + trace "Stopping repo" + await self.close() + + self.started = false diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim new file mode 100644 index 00000000..4338e63a --- /dev/null +++ b/codex/stores/repostore/types.nim @@ -0,0 +1,107 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid + +import ../blockstore +import ../../clock +import ../../errors +import ../../merkletree +import ../../systemclock +import ../../units + +const + DefaultBlockTtl* = 24.hours + DefaultQuotaBytes* = 8.GiBs + +type + QuotaNotEnoughError* = object of CodexError + + RepoStore* = ref object of BlockStore + postFixLen*: int + repoDs*: Datastore + metaDs*: TypedDatastore + clock*: Clock + quotaMaxBytes*: NBytes + quotaUsage*: QuotaUsage + totalBlocks*: Natural + blockTtl*: Duration + started*: bool + + QuotaUsage* {.serialize.} = object + used*: NBytes + reserved*: NBytes + + BlockMetadata* {.serialize.} = object + expiry*: SecondsSince1970 + size*: NBytes + refCount*: Natural + + LeafMetadata* {.serialize.} = object + blkCid*: Cid + proof*: CodexProof + + BlockExpiration* {.serialize.} = object + cid*: Cid + expiry*: SecondsSince1970 + + DeleteResultKind* {.serialize.} = enum + Deleted = 0, # block removed from store + InUse = 1, # block not removed, refCount > 0 and not expired + NotFound = 2 # block not found in store + + DeleteResult* {.serialize.} = object + kind*: DeleteResultKind + released*: NBytes + + StoreResultKind* {.serialize.} = enum + Stored = 0, # new block stored + AlreadyInStore = 1 # block already in store + + StoreResult* {.serialize.} = object + kind*: StoreResultKind + used*: NBytes + +func quotaUsedBytes*(self: RepoStore): NBytes = + self.quotaUsage.used + +func quotaReservedBytes*(self: RepoStore): NBytes = + self.quotaUsage.reserved + +func totalUsed*(self: RepoStore): NBytes = + (self.quotaUsedBytes + self.quotaReservedBytes) + +func available*(self: RepoStore): NBytes = + return self.quotaMaxBytes - self.totalUsed + +func available*(self: RepoStore, bytes: NBytes): bool = + return bytes < self.available() + +func new*( + T: type RepoStore, + repoDs: Datastore, + metaDs: Datastore, + clock: Clock = SystemClock.new(), + postFixLen = 2, + quotaMaxBytes = DefaultQuotaBytes, + blockTtl = DefaultBlockTtl +): RepoStore = + ## Create new instance of a RepoStore + ## + RepoStore( + repoDs: repoDs, + metaDs: TypedDatastore.init(metaDs), + clock: clock, + postFixLen: postFixLen, + quotaMaxBytes: quotaMaxBytes, + blockTtl: blockTtl + ) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index ce89171c..8a3b1a3c 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -38,7 +38,6 @@ type StoreStream* = ref object of SeekableStream store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs - pad*: bool # Pad last block to manifest.blockSize? method initStream*(s: StoreStream) = if s.objName.len == 0: @@ -57,13 +56,15 @@ proc new*( result = StoreStream( store: store, manifest: manifest, - pad: pad, offset: 0) result.initStream() method `size`*(self: StoreStream): int = - bytes(self.manifest, self.pad).int + ## The size of a StoreStream is the size of the original dataset, without + ## padding or parity blocks. + let m = self.manifest + (if m.protected: m.originalDatasetSize else: m.datasetSize).int proc `size=`*(self: StoreStream, size: int) {.error: "Setting the size is forbidden".} = diff --git a/codex/units.nim b/codex/units.nim index 63921d7d..52f44328 100644 --- a/codex/units.nim +++ b/codex/units.nim @@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes logutils.formatIt(NBytes): $it const - MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz + KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz + MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz + GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz +proc KiBs*(v: Natural): NBytes = v.NBytes * KiB proc MiBs*(v: Natural): NBytes = v.NBytes * MiB +proc GiBs*(v: Natural): NBytes = v.NBytes * GiB func divUp*[T: NBytes](a, b : T): int = ## Division with result rounded up (rather than truncated as in 'div') diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index fa49f878..86f881e0 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/sugar import pkg/chronos import pkg/libp2p import pkg/questionable @@ -24,33 +25,28 @@ type testBlockExpirations*: seq[BlockExpiration] getBlockExpirationsThrows*: bool - iteratorIndex: int method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = self.delBlockCids.add(cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) - dec self.iteratorIndex return success() -method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = +method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} = if self.getBlockExpirationsThrows: raise new CatchableError self.getBeMaxNumber = maxNumber self.getBeOffset = offset - var iter = AsyncIter[?BlockExpiration]() + let + testBlockExpirationsCpy = @(self.testBlockExpirations) + limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) - self.iteratorIndex = offset - var numberLeft = maxNumber - proc next(): Future[?BlockExpiration] {.async.} = - if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): - dec numberLeft - let selectedBlock = self.testBlockExpirations[self.iteratorIndex] - inc self.iteratorIndex - return selectedBlock.some - iter.finish - return BlockExpiration.none + let + iter1 = AsyncIter[int].new(offset.. orig check (updated.freeSize - orig) == 200.u256 - check (repo.quotaReservedBytes - origQuota) == 200 + check (repo.quotaReservedBytes - origQuota) == 200.NBytes test "update releases quota when lowering size": let @@ -220,7 +257,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize - 100 check isOk await reservations.update(availability) - check (origQuota - repo.quotaReservedBytes) == 100 + check (origQuota - repo.quotaReservedBytes) == 100.NBytes test "update reserves quota when growing size": let @@ -229,7 +266,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize + 100 check isOk await reservations.update(availability) - check (repo.quotaReservedBytes - origQuota) == 100 + check (repo.quotaReservedBytes - origQuota) == 100.NBytes test "reservation can be partially released": let availability = createAvailability() @@ -333,17 +370,17 @@ asyncchecksuite "Reservations module": check got.error of NotExistsError test "can get available bytes in repo": - check reservations.available == DefaultQuotaBytes + check reservations.available == DefaultQuotaBytes.uint test "reports quota available to be reserved": - check reservations.hasAvailable(DefaultQuotaBytes - 1) + check reservations.hasAvailable(DefaultQuotaBytes.uint - 1) test "reports quota not available to be reserved": - check not reservations.hasAvailable(DefaultQuotaBytes + 1) + check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1) test "fails to create availability with size that is larger than available quota": let created = await reservations.createAvailability( - (DefaultQuotaBytes + 1).u256, + (DefaultQuotaBytes.uint + 1).u256, UInt256.example, UInt256.example, UInt256.example diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index c3352cfa..543a7133 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -2,7 +2,7 @@ import std/sequtils import std/sugar import std/times import pkg/chronos -import pkg/datastore +import pkg/datastore/typedds import pkg/questionable import pkg/questionable/results import pkg/codex/sales diff --git a/tests/codex/sales/teststates.nim b/tests/codex/sales/teststates.nim index ef562136..73810134 100644 --- a/tests/codex/sales/teststates.nim +++ b/tests/codex/sales/teststates.nim @@ -6,5 +6,9 @@ import ./states/testinitialproving import ./states/testfilled import ./states/testproving import ./states/testsimulatedproving +import ./states/testcancelled +import ./states/testerrored +import ./states/testignored +import ./states/testpreparing {.warning[UnusedImport]: off.} diff --git a/tests/codex/stores/repostore/testcoders.nim b/tests/codex/stores/repostore/testcoders.nim new file mode 100644 index 00000000..47cf4097 --- /dev/null +++ b/tests/codex/stores/repostore/testcoders.nim @@ -0,0 +1,71 @@ +import std/unittest +import std/random + +import pkg/stew/objects +import pkg/questionable +import pkg/questionable/results + +import pkg/codex/clock +import pkg/codex/stores/repostore/types +import pkg/codex/stores/repostore/coders + +import ../../helpers + +checksuite "Test coders": + + proc rand(T: type NBytes): T = + rand(Natural).NBytes + + proc rand(E: type[enum]): E = + let ordinals = enumRangeInt64(E) + E(ordinals[rand(ordinals.len - 1)]) + + proc rand(T: type QuotaUsage): T = + QuotaUsage( + used: rand(NBytes), + reserved: rand(NBytes) + ) + + proc rand(T: type BlockMetadata): T = + BlockMetadata( + expiry: rand(SecondsSince1970), + size: rand(NBytes), + refCount: rand(Natural) + ) + + proc rand(T: type DeleteResult): T = + DeleteResult( + kind: rand(DeleteResultKind), + released: rand(NBytes) + ) + + proc rand(T: type StoreResult): T = + StoreResult( + kind: rand(StoreResultKind), + used: rand(NBytes) + ) + + test "Natural encode/decode": + for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]: + check: + success(val) == Natural.decode(encode(val)) + + test "QuotaUsage encode/decode": + for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)): + check: + success(val) == QuotaUsage.decode(encode(val)) + + test "BlockMetadata encode/decode": + for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)): + check: + success(val) == BlockMetadata.decode(encode(val)) + + test "DeleteResult encode/decode": + for val in newSeqWith[DeleteResult](100, rand(DeleteResult)): + check: + success(val) == DeleteResult.decode(encode(val)) + + test "StoreResult encode/decode": + for val in newSeqWith[StoreResult](100, rand(StoreResult)): + check: + success(val) == StoreResult.decode(encode(val)) diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim index c050f623..bdf48c12 100644 --- a/tests/codex/stores/testmaintenance.nim +++ b/tests/codex/stores/testmaintenance.nim @@ -34,10 +34,10 @@ checksuite "BlockMaintainer": var testBe2: BlockExpiration var testBe3: BlockExpiration - proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = + proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration = BlockExpiration( cid: bt.Block.example.cid, - expiration: expiration + expiry: expiry ) setup: @@ -186,4 +186,3 @@ checksuite "BlockMaintainer": await invokeTimerManyTimes() # Second new block has expired check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] - diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index e58eeb29..ecb3b75e 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -23,6 +23,8 @@ import ../helpers/mockclock import ../examples import ./commonstoretests +import ./repostore/testcoders + checksuite "Test RepoStore start/stop": var @@ -34,24 +36,24 @@ checksuite "Test RepoStore start/stop": metaDs = SQLiteDatastore.new(Memory).tryGet() test "Should set started flag once started": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() check repo.started test "Should set started flag to false once stopped": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.stop() check not repo.started test "Should allow start to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.start() check repo.started test "Should allow stop to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.stop() await repo.stop() check not repo.started @@ -73,7 +75,7 @@ asyncchecksuite "RepoStore": mockClock = MockClock.new() mockClock.set(now) - repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200) + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb) teardown: (await repoDs.close()).tryGet @@ -85,117 +87,107 @@ asyncchecksuite "RepoStore": test "Should update current used bytes on block put": let blk = createTestBlock(200) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet check: - repo.quotaUsedBytes == 200 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u + repo.quotaUsedBytes == 200'nb test "Should update current used bytes on block delete": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb (await repo.delBlock(blk.cid)).tryGet check: - repo.quotaUsedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u + repo.quotaUsedBytes == 0'nb test "Should not update current used bytes if block exist": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb # put again (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 - - check: - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + check repo.quotaUsedBytes == 100'nb test "Should fail storing passed the quota": let blk = createTestBlock(300) - check repo.totalUsed == 0 - expect QuotaUsedError: + check repo.totalUsed == 0'nb + expect QuotaNotEnoughError: (await repo.putBlock(blk)).tryGet test "Should reserve bytes": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb - (await repo.reserve(100)).tryGet + (await repo.reserve(100'nb)).tryGet check: - repo.totalUsed == 200 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 200'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 100'nb test "Should not reserve bytes over max quota": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb expect QuotaNotEnoughError: - (await repo.reserve(101)).tryGet + (await repo.reserve(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 0 - - expect DatastoreKeyNotFound: - discard (await metaDs.get(QuotaReservedKey)).tryGet + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 0'nb test "Should release bytes": discard createTestBlock(100) - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - (await repo.release(100)).tryGet + (await repo.release(100'nb)).tryGet check: - repo.totalUsed == 0 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u + repo.totalUsed == 0'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 0'nb test "Should not release bytes less than quota": - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - expect CatchableError: - (await repo.release(101)).tryGet + expect RangeDefect: + (await repo.release(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 100'nb - proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = - let - query = Query.init(key) - responseIter = (await metaDs.query(query)).tryGet - response = (await allFinished(toSeq(responseIter))) - .mapIt(it.read.tryGet) - .filterIt(it.key.isSome) - return response + proc getExpirations(): Future[seq[BlockExpiration]] {.async.} = + let iter = (await repo.getBlockExpirations(100, 0)).tryGet() + + var res = newSeq[BlockExpiration]() + for fut in iter: + let be = await fut + res.add(be) + + res test "Should store block expiration timestamp": let @@ -203,49 +195,40 @@ asyncchecksuite "RepoStore": blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + 10 - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, duration.some)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should store block with default expiration timestamp when not provided": let blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds) (await repo.putBlock(blk)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should refuse update expiry with negative timestamp": let blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations expect ValueError: (await repo.ensureExpiry(blk.cid, -1)).tryGet @@ -262,56 +245,45 @@ asyncchecksuite "RepoStore": test "Should update block expiration timestamp when new expiration is farther": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20) - (await repo.putBlock(blk, some duration.seconds)).tryGet + (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 20)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == updatedExpectedExpiration.toBytes + expectedExpiration notin updatedExpirations + updatedExpectedExpiration in updatedExpirations test "Should not update block expiration timestamp when current expiration is farther then new one": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5) + (await repo.putBlock(blk, some 10.seconds)).tryGet - (await repo.putBlock(blk, some duration.seconds)).tryGet - - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 5)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in updatedExpirations + updatedExpectedExpiration notin updatedExpirations test "delBlock should remove expiration metadata": let @@ -321,19 +293,19 @@ asyncchecksuite "RepoStore": (await repo.putBlock(blk, 10.seconds.some)).tryGet (await repo.delBlock(blk.cid)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 0 + expirations.len == 0 test "Should retrieve block expiration information": - proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = + proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: return expirations - for be in toSeq(iter): - if value =? (await be): - expirations.add(value) + for beFut in toSeq(iter): + let value = await beFut + expirations.add(value) return expirations let @@ -343,12 +315,12 @@ asyncchecksuite "RepoStore": blk3 = createTestBlock(12) let - expectedExpiration: SecondsSince1970 = 123 + 10 + expectedExpiration: SecondsSince1970 = now + 10 proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = check: be.cid == expectedBlock.cid - be.expiration == expectedExpiration + be.expiry == expectedExpiration (await repo.putBlock(blk1, duration.some)).tryGet diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 8ca428e7..7a30259f 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -58,6 +58,14 @@ suite "Indexing strategies": expect IndexingWrongIterationsError: discard LinearStrategy.init(0, 10, 0) + test "should split elements evenly when possible": + let + l = LinearStrategy.init(0, 11, 3) + check: + toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it) + toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it) + toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it) + test "linear - oob": expect IndexingError: discard linear.getIndicies(3) diff --git a/tests/codex/testmanifest.nim b/tests/codex/testmanifest.nim index 188cc957..3393fa08 100644 --- a/tests/codex/testmanifest.nim +++ b/tests/codex/testmanifest.nim @@ -74,3 +74,36 @@ checksuite "Manifest": test "Should encode/decode to/from verifiable manifest": check: encodeDecode(verifiableManifest) == verifiableManifest + + +suite "Manifest - Attribute Inheritance": + proc makeProtectedManifest(strategy: StrategyType): Manifest = + Manifest.new( + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 1.MiBs, + datasetSize = 100.MiBs, + ), + treeCid = Cid.example, + datasetSize = 200.MiBs, + ecK = 1, + ecM = 1, + strategy = strategy + ) + + test "Should preserve interleaving strategy for protected manifest in verifiable manifest": + var verifiable = Manifest.new( + manifest = makeProtectedManifest(SteppedStrategy), + verifyRoot = Cid.example, + slotRoots = @[Cid.example, Cid.example] + ).tryGet() + + check verifiable.protectedStrategy == SteppedStrategy + + verifiable = Manifest.new( + manifest = makeProtectedManifest(LinearStrategy), + verifyRoot = Cid.example, + slotRoots = @[Cid.example, Cid.example] + ).tryGet() + + check verifiable.protectedStrategy == LinearStrategy diff --git a/tests/codex/teststorestream.nim b/tests/codex/teststorestream.nim index eaf92c1d..b717a8ec 100644 --- a/tests/codex/teststorestream.nim +++ b/tests/codex/teststorestream.nim @@ -1,12 +1,15 @@ import pkg/chronos import pkg/questionable/results -import pkg/codex/streams -import pkg/codex/stores -import pkg/codex/manifest -import pkg/codex/blocktype as bt +import pkg/codex/[ + streams, + stores, + indexingstrategy, + manifest, + blocktype as bt] import ../asynctest +import ./examples import ./helpers asyncchecksuite "StoreStream": @@ -99,3 +102,40 @@ asyncchecksuite "StoreStream": await stream.readExactly(addr buf[0], 15) check sequentialBytes(buf,15,0) + +suite "StoreStream - Size Tests": + + var stream: StoreStream + + teardown: + await stream.close() + + test "Should return dataset size as stream size": + let manifest = Manifest.new( + treeCid = Cid.example, + datasetSize = 80.NBytes, + blockSize = 10.NBytes + ) + + stream = StoreStream.new(CacheStore.new(), manifest) + + check stream.size == 80 + + test "Should not count parity/padding bytes as part of stream size": + let protectedManifest = Manifest.new( + treeCid = Cid.example, + datasetSize = 120.NBytes, # size including parity bytes + blockSize = 10.NBytes, + version = CIDv1, + hcodec = Sha256HashCodec, + codec = BlockCodec, + ecK = 2, + ecM = 1, + originalTreeCid = Cid.example, + originalDatasetSize = 80.NBytes, # size without parity bytes + strategy = StrategyType.SteppedStrategy + ) + + stream = StoreStream.new(CacheStore.new(), protectedManifest) + + check stream.size == 80 diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 74363f60..8c2c20e4 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,5 +1,6 @@ import std/sequtils from pkg/libp2p import `==` +import pkg/codex/units import ./twonodes twonodessuite "REST API", debug1 = false, debug2 = false: @@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false: discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get let space = client1.space().tryGet() check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint + space.totalBlocks == 2 + space.quotaMaxBytes == 8589934592.NBytes + space.quotaUsedBytes == 65592.NBytes + space.quotaReservedBytes == 12.NBytes test "node lists local files": let content1 = "some file contents"