Merge latest master into release (#842)

* fix: createReservation lock (#825)

* fix: createReservation lock

* fix: additional locking places

* fix: acquire lock

* chore: feedback

Co-authored-by: markspanbroek <mark@spanbroek.net>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* feat: withLock template and fixed tests

* fix: use proc for MockReservations constructor

* chore: feedback

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* chore: feedback implementation

---------

Signed-off-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>

* Block deletion with ref count & repostore refactor (#631)

* Fix StoreStream so it doesn't return parity bytes  (#838)

* fix storestream so it doesn\'t return parity bits for protected/verifiable manifests

* use Cid.example instead of creating a mock manually

* Fix verifiable manifest initialization (#839)

* fix verifiable manifest initialization

* fix linearstrategy, use verifiableStrategy to select blocks for slots

* check for both strategies in attribute inheritance test

* ci: add verify_circuit=true to the releases (#840)

* provisional fix so EC errors do not crash the node on download (#841)

---------

Signed-off-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Co-authored-by: Tomasz Bekas <tomasz.bekas@gmail.com>
Co-authored-by: Giuliano Mega <giuliano.mega@gmail.com>
This commit is contained in:
Slava 2024-06-26 05:38:04 +03:00 committed by GitHub
parent 6e9bdf1d7e
commit 15ff87a8bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1415 additions and 991 deletions

View File

@ -11,6 +11,7 @@ env:
nim_version: v1.6.14 nim_version: v1.6.14
rust_version: 1.78.0 rust_version: 1.78.0
binary_base: codex binary_base: codex
nim_flags: '-d:verify_circuit=true'
upload_to_codex: false upload_to_codex: false
jobs: jobs:
@ -74,7 +75,7 @@ jobs:
- name: Release - Build - name: Release - Build
run: | run: |
make NIMFLAGS="--out:${{ env.binary }}" make NIMFLAGS="--out:${{ env.binary }} ${{ env.nim_flags }}"
- name: Release - Upload binaries - name: Release - Upload binaries
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4

View File

@ -258,7 +258,7 @@ proc new*(
repoDs = repoData, repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"), .expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota.uint, quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl) blockTtl = config.blockTtl)
maintenance = BlockMaintainer.new( maintenance = BlockMaintainer.new(

View File

@ -49,8 +49,8 @@ func getLinearIndicies(
self.checkIteration(iteration) self.checkIteration(iteration)
let let
first = self.firstIndex + iteration * (self.step + 1) first = self.firstIndex + iteration * self.step
last = min(first + self.step, self.lastIndex) last = min(first + self.step - 1, self.lastIndex)
getIter(first, last, 1) getIter(first, last, 1)
@ -94,4 +94,4 @@ func init*(
firstIndex: firstIndex, firstIndex: firstIndex,
lastIndex: lastIndex, lastIndex: lastIndex,
iterations: iterations, iterations: iterations,
step: divUp((lastIndex - firstIndex), iterations)) step: divUp((lastIndex - firstIndex + 1), iterations))

View File

@ -135,13 +135,6 @@ func isManifest*(mc: MultiCodec): ?!bool =
# Various sizes and verification # Various sizes and verification
############################################################ ############################################################
func bytes*(self: Manifest, pad = true): NBytes =
## Compute how many bytes corresponding StoreStream(Manifest, pad) will return
if pad or self.protected:
self.blocksCount.NBytes * self.blockSize
else:
self.datasetSize
func rounded*(self: Manifest): int = func rounded*(self: Manifest): int =
## Number of data blocks in *protected* manifest including padding at the end ## Number of data blocks in *protected* manifest including padding at the end
roundUp(self.originalBlocksCount, self.ecK) roundUp(self.originalBlocksCount, self.ecK)
@ -238,7 +231,7 @@ func new*(
treeCid: Cid, treeCid: Cid,
datasetSize: NBytes, datasetSize: NBytes,
ecK, ecM: int, ecK, ecM: int,
strategy: StrategyType): Manifest = strategy = SteppedStrategy): Manifest =
## Create an erasure protected dataset from an ## Create an erasure protected dataset from an
## unprotected one ## unprotected one
## ##
@ -284,7 +277,7 @@ func new*(
ecM: int, ecM: int,
originalTreeCid: Cid, originalTreeCid: Cid,
originalDatasetSize: NBytes, originalDatasetSize: NBytes,
strategy: StrategyType): Manifest = strategy = SteppedStrategy): Manifest =
Manifest( Manifest(
treeCid: treeCid, treeCid: treeCid,
@ -306,7 +299,7 @@ func new*(
verifyRoot: Cid, verifyRoot: Cid,
slotRoots: openArray[Cid], slotRoots: openArray[Cid],
cellSize = DefaultCellSize, cellSize = DefaultCellSize,
strategy = SteppedStrategy): ?!Manifest = strategy = LinearStrategy): ?!Manifest =
## Create a verifiable dataset from an ## Create a verifiable dataset from an
## protected one ## protected one
## ##
@ -331,6 +324,7 @@ func new*(
ecM: manifest.ecM, ecM: manifest.ecM,
originalTreeCid: manifest.treeCid, originalTreeCid: manifest.treeCid,
originalDatasetSize: manifest.originalDatasetSize, originalDatasetSize: manifest.originalDatasetSize,
protectedStrategy: manifest.protectedStrategy,
verifiable: true, verifiable: true,
verifyRoot: verifyRoot, verifyRoot: verifyRoot,
slotRoots: @slotRoots, slotRoots: @slotRoots,

View File

@ -14,6 +14,8 @@ push: {.upraises: [].}
import pkg/libp2p import pkg/libp2p
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/byteutils
import pkg/serde/json
import ../../units import ../../units
import ../../errors import ../../errors
@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof =
nodes.add node nodes.add node
CodexProof.init(mcodec, index.int, nleaves.int, nodes) CodexProof.init(mcodec, index.int, nleaves.int, nodes)
proc fromJson*(
_: type CodexProof,
json: JsonNode
): ?!CodexProof =
expectJsonKind(Cid, JString, json)
var bytes: seq[byte]
try:
bytes = hexToSeqByte(json.str)
except ValueError as err:
return failure(err)
CodexProof.decode(bytes)
func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode())

View File

@ -240,14 +240,14 @@ proc streamEntireDataset(
self: CodexNodeRef, self: CodexNodeRef,
manifest: Manifest, manifest: Manifest,
manifestCid: Cid, manifestCid: Cid,
): ?!LPStream = ): Future[?!LPStream] {.async.} =
## Streams the contents of the entire dataset described by the manifest. ## Streams the contents of the entire dataset described by the manifest.
## ##
trace "Retrieving blocks from manifest", manifestCid trace "Retrieving blocks from manifest", manifestCid
if manifest.protected: if manifest.protected:
# Retrieve, decode and save to the local store all EС groups # Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} = proc erasureJob(): Future[?!void] {.async.} =
try: try:
# Spawn an erasure decoding job # Spawn an erasure decoding job
let let
@ -258,10 +258,16 @@ proc streamEntireDataset(
self.taskpool) self.taskpool)
without _ =? (await erasure.decode(manifest)), error: without _ =? (await erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", manifestCid, exc = error.msg trace "Unable to erasure decode manifest", manifestCid, exc = error.msg
except CatchableError as exc: # --------------------------------------------------------------------------
# FIXME this is a HACK so that the node does not crash during the workshop.
# We should NOT catch Defect.
except Exception as exc:
trace "Exception decoding manifest", manifestCid, exc = exc.msg trace "Exception decoding manifest", manifestCid, exc = exc.msg
return failure(exc.msg)
# --------------------------------------------------------------------------
asyncSpawn erasureJob() if err =? (await erasureJob()).errorOption:
return failure(err)
# Retrieve all blocks of the dataset sequentially from the local store or network # Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid trace "Creating store stream for manifest", manifestCid
@ -283,7 +289,7 @@ proc retrieve*(
return await self.streamSingleBlock(cid) return await self.streamSingleBlock(cid)
self.streamEntireDataset(manifest, cid) await self.streamEntireDataset(manifest, cid)
proc store*( proc store*(
self: CodexNodeRef, self: CodexNodeRef,
@ -534,7 +540,9 @@ proc onStore(
trace "Unable to fetch manifest for cid", cid, err = err.msg trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err) return failure(err)
without builder =? Poseidon2Builder.new(self.networkStore, manifest), err: without builder =? Poseidon2Builder.new(
self.networkStore, manifest, manifest.verifiableStrategy
), err:
trace "Unable to create slots builder", err = err.msg trace "Unable to create slots builder", err = err.msg
return failure(err) return failure(err)
@ -559,8 +567,8 @@ proc onStore(
return success() return success()
without indexer =? manifest.protectedStrategy.init( without indexer =? manifest.verifiableStrategy.init(
0, manifest.numSlotBlocks() - 1, manifest.numSlots).catch, err: 0, manifest.blocksCount - 1, manifest.numSlots).catch, err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err) return failure(err)

View File

@ -7,6 +7,7 @@ import ../sales
import ../purchasing import ../purchasing
import ../utils/json import ../utils/json
import ../manifest import ../manifest
import ../units
export json export json
@ -65,10 +66,10 @@ type
id*: NodeId id*: NodeId
RestRepoStore* = object RestRepoStore* = object
totalBlocks* {.serialize.}: uint totalBlocks* {.serialize.}: Natural
quotaMaxBytes* {.serialize.}: uint quotaMaxBytes* {.serialize.}: NBytes
quotaUsedBytes* {.serialize.}: uint quotaUsedBytes* {.serialize.}: NBytes
quotaReservedBytes* {.serialize.}: uint quotaReservedBytes* {.serialize.}: NBytes
proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList = proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList =
RestContentList( RestContentList(

View File

@ -46,6 +46,7 @@ import ../stores
import ../market import ../market
import ../contracts/requests import ../contracts/requests
import ../utils/json import ../utils/json
import ../units
export requests export requests
export logutils export logutils
@ -53,6 +54,7 @@ export logutils
logScope: logScope:
topics = "sales reservations" topics = "sales reservations"
type type
AvailabilityId* = distinct array[32, byte] AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte] ReservationId* = distinct array[32, byte]
@ -71,7 +73,8 @@ type
size* {.serialize.}: UInt256 size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256 slotIndex* {.serialize.}: UInt256
Reservations* = ref object Reservations* = ref object of RootObj
availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability
repo: RepoStore repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
@ -95,12 +98,22 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet ReservationsKey = (SalesKey / "reservations").tryGet
proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
template withLock(lock, body) =
try:
await lock.acquire()
body
finally:
if lock.locked:
lock.release()
proc new*(T: type Reservations, proc new*(T: type Reservations,
repo: RepoStore): Reservations = repo: RepoStore): Reservations =
T(repo: repo) T(availabilityLock: newAsyncLock(),repo: repo)
proc init*( proc init*(
_: type Availability, _: type Availability,
@ -166,16 +179,16 @@ func key*(availability: Availability): ?!Key =
func key*(reservation: Reservation): ?!Key = func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId) return key(reservation.id, reservation.availabilityId)
func available*(self: Reservations): uint = self.repo.available func available*(self: Reservations): uint = self.repo.available.uint
func hasAvailable*(self: Reservations, bytes: uint): bool = func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes) self.repo.available(bytes.NBytes)
proc exists*( proc exists*(
self: Reservations, self: Reservations,
key: Key): Future[bool] {.async.} = key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key) let exists = await self.repo.metaDs.ds.contains(key)
return exists return exists
proc getImpl( proc getImpl(
@ -186,7 +199,7 @@ proc getImpl(
let err = newException(NotExistsError, "object with key " & $key & " does not exist") let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err) return failure(err)
without serialized =? await self.repo.metaDs.get(key), error: without serialized =? await self.repo.metaDs.ds.get(key), error:
return failure(error.toErr(GetFailedError)) return failure(error.toErr(GetFailedError))
return success serialized return success serialized
@ -213,7 +226,7 @@ proc updateImpl(
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
if err =? (await self.repo.metaDs.put( if err =? (await self.repo.metaDs.ds.put(
key, key,
@(obj.toJson.toBytes) @(obj.toJson.toBytes)
)).errorOption: )).errorOption:
@ -221,20 +234,19 @@ proc updateImpl(
return success() return success()
proc update*( proc updateAvailability(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
self: Reservations, self: Reservations,
obj: Availability): Future[?!void] {.async.} = obj: Availability): Future[?!void] {.async.} =
logScope:
availabilityId = obj.id
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
without oldAvailability =? await self.get(key, Availability), err: without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError: if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj) let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added # inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded: if onAvailabilityAdded =? self.onAvailabilityAdded:
@ -248,20 +260,20 @@ proc update*(
except CatchableError as e: except CatchableError as e:
# we don't have any insight into types of exceptions that # we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined # `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback", warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
availabilityId = obj.id, error = e.msg
return res return res
else: else:
return failure(err) return failure(err)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly # Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize: if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError)) return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj) let res = await self.updateImpl(obj)
@ -280,11 +292,21 @@ proc update*(
except CatchableError as e: except CatchableError as e:
# we don't have any insight into types of exceptions that # we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined # `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback", warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
availabilityId = obj.id, error = e.msg
return res return res
proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
proc delete( proc delete(
self: Reservations, self: Reservations,
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async.} =
@ -294,7 +316,7 @@ proc delete(
if not await self.exists(key): if not await self.exists(key):
return success() return success()
if err =? (await self.repo.metaDs.delete(key)).errorOption: if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError)) return failure(err.toErr(DeleteFailedError))
return success() return success()
@ -312,6 +334,7 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error: without key =? key(reservationId, availabilityId), error:
return failure(error) return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error: without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError: if error of NotExistsError:
return success() return success()
@ -330,10 +353,10 @@ proc deleteReservation*(
availability.freeSize += reservation.size availability.freeSize += reservation.size
if updateErr =? (await self.update(availability)).errorOption: if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr) return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption: if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError)) return failure(err.toErr(DeleteFailedError))
return success() return success()
@ -355,14 +378,14 @@ proc createAvailability*(
) )
let bytes = availability.freeSize.truncate(uint) let bytes = availability.freeSize.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption: if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption: if updateErr =? (await self.update(availability)).errorOption:
# rollback the reserve # rollback the reserve
trace "rolling back reserve" trace "rolling back reserve"
if rollbackErr =? (await self.repo.release(bytes)).errorOption: if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption:
rollbackErr.parent = updateErr rollbackErr.parent = updateErr
return failure(rollbackErr) return failure(rollbackErr)
@ -370,30 +393,32 @@ proc createAvailability*(
return success(availability) return success(availability)
proc createReservation*( method createReservation*(
self: Reservations, self: Reservations,
availabilityId: AvailabilityId, availabilityId: AvailabilityId,
slotSize: UInt256, slotSize: UInt256,
requestId: RequestId, requestId: RequestId,
slotIndex: UInt256 slotIndex: UInt256
): Future[?!Reservation] {.async.} = ): Future[?!Reservation] {.async, base.} =
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error: without availabilityKey =? availabilityId.key, error:
return failure(error) return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error: without availability =? await self.get(availabilityKey, Availability), error:
return failure(error) return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize: if availability.freeSize < slotSize:
let error = newException( let error = newException(
BytesOutOfBoundsError, BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability") "trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error) return failure(error)
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
if createResErr =? (await self.update(reservation)).errorOption: if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr) return failure(createResErr)
@ -402,9 +427,9 @@ proc createReservation*(
availability.freeSize -= slotSize availability.freeSize -= slotSize
# update availability with reduced size # update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption: trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "rolling back reservation creation" trace "Updating availability failed, rolling back reservation creation"
without key =? reservation.key, keyError: without key =? reservation.key, keyError:
keyError.parent = updateErr keyError.parent = updateErr
@ -417,6 +442,7 @@ proc createReservation*(
return failure(updateErr) return failure(updateErr)
trace "Reservation succesfully created"
return success(reservation) return success(reservation)
proc returnBytesToAvailability*( proc returnBytesToAvailability*(
@ -429,7 +455,7 @@ proc returnBytesToAvailability*(
reservationId reservationId
availabilityId availabilityId
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error: without key =? key(reservationId, availabilityId), error:
return failure(error) return failure(error)
@ -448,7 +474,7 @@ proc returnBytesToAvailability*(
# First lets see if we can re-reserve the bytes, if the Repo's quota # First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm. # is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error: without availabilityKey =? availabilityId.key, error:
@ -460,10 +486,10 @@ proc returnBytesToAvailability*(
availability.freeSize += bytesToBeReturned availability.freeSize += bytesToBeReturned
# Update availability with returned size # Update availability with returned size
if updateErr =? (await self.update(availability)).errorOption: if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes" trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
rollbackErr.parent = updateErr rollbackErr.parent = updateErr
return failure(rollbackErr) return failure(rollbackErr)
@ -497,7 +523,7 @@ proc release*(
"trying to release an amount of bytes that is greater than the total size of the Reservation") "trying to release an amount of bytes that is greater than the total size of the Reservation")
return failure(error) return failure(error)
if releaseErr =? (await self.repo.release(bytes)).errorOption: if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError)) return failure(releaseErr.toErr(ReleaseFailedError))
reservation.size -= bytes.u256 reservation.size -= bytes.u256
@ -507,7 +533,7 @@ proc release*(
# rollback release if an update error encountered # rollback release if an update error encountered
trace "rolling back release" trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
rollbackErr.parent = err rollbackErr.parent = err
return failure(rollbackErr) return failure(rollbackErr)
return failure(err) return failure(err)
@ -537,7 +563,7 @@ proc storables(
else: else:
raiseAssert "unknown type" raiseAssert "unknown type"
without results =? await self.repo.metaDs.query(query), error: without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error) return failure(error)
# /sales/reservations # /sales/reservations
@ -621,6 +647,7 @@ proc findAvailability*(
minPrice >= availability.minPrice: minPrice >= availability.minPrice:
trace "availability matched", trace "availability matched",
id = availability.id,
size, availFreeSize = availability.freeSize, size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration, duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice, minPrice, availMinPrice = availability.minPrice,
@ -635,8 +662,8 @@ proc findAvailability*(
return some availability return some availability
trace "availability did not match", trace "availability did not match",
id = availability.id,
size, availFreeSize = availability.freeSize, size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration, duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice, minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral collateral, availMaxCollateral = availability.maxCollateral

View File

@ -1,5 +1,6 @@
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/metrics
import ../../logutils import ../../logutils
import ../../market import ../../market
@ -13,6 +14,8 @@ import ./ignored
import ./downloading import ./downloading
import ./errored import ./errored
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
type type
SalePreparing* = ref object of ErrorHandlingState SalePreparing* = ref object of ErrorHandlingState
@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
request.id, request.id,
data.slotIndex data.slotIndex
), error: ), error:
trace "Creation of reservation failed"
# Race condition:
# reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it.
# Should createReservation fail because there's no space, we proceed to SaleIgnored.
if error of BytesOutOfBoundsError:
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
codex_reservations_availability_mismatch.inc()
return some State(SaleIgnored())
return some State(SaleErrored(error: error)) return some State(SaleErrored(error: error))
trace "Reservation created succesfully"
data.reservation = some reservation data.reservation = some reservation
return some State(SaleDownloading()) return some State(SaleDownloading())

View File

@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.}
trace "Unable to delete block from repoStore" trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now: if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid) await self.deleteExpiredBlock(be.cid)
else: else:
inc self.offset inc self.offset
@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
return return
var numberReceived = 0 var numberReceived = 0
for maybeBeFuture in iter: for beFut in iter:
if be =? await maybeBeFuture: let be = await beFut
inc numberReceived inc numberReceived
await self.processBlockExpiration(be) await self.processBlockExpiration(be)
await sleepAsync(50.millis) await sleepAsync(1.millis) # cooperative scheduling
# If we received fewer blockExpirations from the iterator than we asked for, # If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time. # We're at the end of the dataset and should start from 0 next time.

View File

@ -6,7 +6,7 @@ import pkg/datastore/typedds
import ../utils/asynciter import ../utils/asynciter
type KeyVal[T] = tuple[key: Key, value: T] type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T]( proc toAsyncIter*[T](
queryIter: QueryIter[T], queryIter: QueryIter[T],

View File

@ -1,679 +1,5 @@
## Nim-Codex import ./repostore/store
## Copyright (c) 2022 Status Research & Development GmbH import ./repostore/types
## Licensed under either of import ./repostore/coders
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises export store, types, coders
push: {.upraises: [].}
import pkg/chronos
import pkg/chronos/futures
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/lrucache
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ../blocktype
import ../clock
import ../systemclock
import ../logutils
import ../merkletree
import ../utils
export blocktype, cid
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
type
QuotaUsedError* = object of CodexError
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
clock: Clock
totalBlocks*: uint # number of blocks in the store
quotaMaxBytes*: uint # maximum available bytes
quotaUsedBytes*: uint # bytes used by the repo
quotaReservedBytes*: uint # bytes reserved by the repo
blockTtl*: Duration
started*: bool
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64)
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): uint =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE
@nBytes & cidBytes & proofBytes
proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? CodexProof.decode(data[sizeof(uint64) + n..^1])
success((cid, proof))
proc decodeCid(_: type (Cid, CodexProof), data: seq[byte]): ?!Cid =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
success(cid)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blockCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Storing block cid and proof", blockCid, key
let value = (blockCid, proof).encode()
await self.metaDs.put(key, value)
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!(Cid, CodexProof)] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
error "Unable to decode cid and proof", err = err.msg
return failure(err)
return success (cid, proof)
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!Cid] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
# This failure is expected to happen frequently:
# NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine.
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Error getting cid from datastore", err = err.msg, key
return failure(err)
return (Cid, CodexProof).decodeCid(value)
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
error "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
error "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
let (cid, proof) = cidAndProof
without blk =? await self.getBlock(cid), err:
return failure(err)
success((blk, proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
return failure(err)
await self.getBlock(cid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: SecondsSince1970): ?!BatchEntry =
## Get an expiration entry for a batch with timestamp
##
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return success((key, ttl.toBytes))
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: ?Duration): ?!BatchEntry =
## Get an expiration entry for a batch for duration since "now"
##
let duration = ttl |? self.blockTtl
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
logScope:
cid = cid
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
without expiryKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without currentExpiry =? await self.metaDs.get(expiryKey), err:
if err of DatastoreKeyNotFound:
error "No current expiry exists for the block"
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Could not read datastore key", err = err.msg
return failure(err)
logScope:
current = currentExpiry.toSecondsSince1970
ensuring = expiry
if expiry <= currentExpiry.toSecondsSince1970:
trace "Expiry is larger than or equal to requested"
return success()
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
trace "Error updating expiration metadata entry", err = err.msg
return failure(err)
return success()
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
await self.ensureExpiry(cidAndProof[0], expiry)
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
if err =? (await self.metaDs.put(
CodexTotalBlocksKey,
@(self.totalBlocks.uint64.toBytesBE))).errorOption:
trace "Error total blocks key!", err = err.msg
return failure(err)
return success()
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
warn "Error getting key from provider", err = err.msg
return failure(err)
if await key in self.repoDs:
trace "Block already in store", cid = blk.cid
return success()
if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes:
error "Cannot store block, quota used!", used = self.totalUsed
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))
var
batch: seq[BatchEntry]
let
used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
error "Error storing block", err = err.msg
return failure(err)
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
warn "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
error "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption:
error "Error deleting block after failed quota update", err = err.msg
return failure(err)
return failure(err)
self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
warn "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
self.updateMetrics()
return success()
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
logScope:
cid = cid
trace "Deleting block"
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
if blk =? (await self.getBlock(cid)):
if key =? makePrefixKey(self.postFixLen, cid) and
err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block!", err = err.msg
return failure(err)
if isErr (await self.updateQuotaBytesUsed(blk)):
trace "Unable to update quote-bytes-used in metadata store"
return failure("Unable to update quote-bytes-used in metadata store")
if isErr (await self.removeBlockExpirationEntry(blk.cid)):
trace "Unable to remove block expiration entry from metadata store"
return failure("Unable to remove block expiration entry from metadata store")
trace "Deleted block", cid, totalUsed = self.totalUsed
dec self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Fetching proof", key
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return success()
else:
return failure(err)
without cid =? (Cid, CodexProof).decodeCid(value), err:
return failure(err)
trace "Deleting block", cid
if err =? (await self.delBlock(cid)).errorOption:
return failure(err)
await self.metaDs.delete(key)
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(cid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} =
## Get block expirations from the given RepoStore
##
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)
without queryIter =? (await self.metaDs.query(query)), err:
trace "Unable to execute block expirations query"
return failure(err)
var iter = AsyncIter[?BlockExpiration]()
proc next(): Future[?BlockExpiration] {.async.} =
if not queryIter.finished:
if pair =? (await queryIter.next()) and blockKey =? pair.key:
let expirationTimestamp = pair.data
let cidResult = Cid.init(blockKey.value)
if not cidResult.isOk:
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration(
cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970
).some
else:
discard await queryIter.dispose()
iter.finish
return BlockExpiration.none
iter.next = next
return success iter
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes
if (self.totalUsed + bytes) > self.quotaMaxBytes:
trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes
return failure(
newException(QuotaNotEnoughError, "Not enough storage quota to reserver"))
self.quotaReservedBytes += bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error reserving bytes", err = err.msg
self.quotaReservedBytes += bytes
return failure(err)
return success()
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes
if (self.quotaReservedBytes.int - bytes.int) < 0:
trace "Cannot release this many bytes",
quotaReservedBytes = self.quotaReservedBytes, bytes
return failure("Cannot release this many bytes")
self.quotaReservedBytes -= bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error releasing bytes", err = err.msg
self.quotaReservedBytes -= bytes
return failure(err)
trace "Released bytes", bytes
self.updateMetrics()
return success()
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting repo"
without total =? await self.metaDs.get(CodexTotalBlocksKey), err:
if not (err of DatastoreKeyNotFound):
error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey
if total.len > 0:
self.totalBlocks = uint64.fromBytesBE(total).uint
trace "Number of blocks in store at start", total = self.totalBlocks
## load current persist and cache bytes from meta ds
without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting cache bytes from datastore",
err = err.msg, key = $QuotaUsedKey
raise newException(Defect, err.msg)
if quotaUsedBytes.len > 0:
self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint
notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes
without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting persist bytes from datastore",
err = err.msg, key = $QuotaReservedKey
raise newException(Defect, err.msg)
if quotaReservedBytes.len > 0:
self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint
if self.quotaUsedBytes > self.quotaMaxBytes:
raiseAssert "All storage quota used, increase storage quota!"
notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes
self.updateMetrics()
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: metaDs,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)

View File

@ -0,0 +1,47 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
##
import std/sugar
import pkg/libp2p/cid
import pkg/serde/json
import pkg/stew/byteutils
import pkg/stew/endians2
import ./types
import ../../errors
import ../../merkletree
import ../../utils/json
proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes()
proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(i: uint64): seq[byte] =
@(i.toBytesBE)
proc decode*(T: type uint64, bytes: seq[byte]): ?!T =
if bytes.len >= sizeof(uint64):
success(uint64.fromBytesBE(bytes))
else:
failure("Not enough bytes to decode `uint64`")
proc encode*(i: Natural | enum): seq[byte] = cast[uint64](i).encode
proc decode*(T: typedesc[Natural | enum], bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[T](ui))

View File

@ -0,0 +1,213 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ../blockstore
import ../keyutils
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
await self.metaDs.modifyGet(key,
proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} =
var
md: LeafMetadata
res: StoreResultKind
if currMd =? maybeCurrMd:
md = currMd
res = AlreadyInStore
else:
md = LeafMetadata(blkCid: blkCid, proof: proof)
res = Stored
(md.some, res)
)
proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without leafMd =? await get[LeafMetadata](self.metaDs, key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
success(leafMd)
proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} =
await self.metaDs.modify(CodexTotalBlocksKey,
proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
let count: Natural =
if currCount =? maybeCurrCount:
currCount + plusCount - minusCount
else:
plusCount - minusCount
self.totalBlocks = count
codex_repostore_blocks.set(count.int64)
count.some
)
proc updateQuotaUsage*(
self: RepoStore,
plusUsed: NBytes = 0.NBytes,
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes
): Future[?!void] {.async.} =
await self.metaDs.modify(QuotaUsedKey,
proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
var usage: QuotaUsage
if currUsage =? maybeCurrUsage:
usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved)
else:
usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
if usage.used + usage.reserved > self.quotaMaxBytes:
raise newException(QuotaNotEnoughError,
"Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
$usage.reserved & ", limit: " & $self.quotaMaxBytes)
else:
self.quotaUsage = usage
codex_repostore_bytes_used.set(usage.used.int64)
codex_repostore_bytes_reserved.set(usage.reserved.int64)
return usage.some
)
proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,
plusRefCount: Natural = 0,
minusRefCount: Natural = 0,
minExpiry: SecondsSince1970 = 0
): Future[?!void] {.async.} =
if cid.isEmpty:
return success()
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
await self.metaDs.modify(metaKey,
proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} =
if currBlockMd =? maybeCurrBlockMd:
BlockMetadata(
size: currBlockMd.size,
expiry: max(currBlockMd.expiry, minExpiry),
refCount: currBlockMd.refCount + plusRefCount - minusRefCount
).some
else:
raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found")
)
proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
without metaKey =? createBlockExpirationMetadataKey(blk.cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} =
var
md: BlockMetadata
res: StoreResult
if currMd =? maybeCurrMd:
if currMd.size == blk.data.len.NBytes:
md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount)
res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if not hasBlock:
warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
else:
raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid)
else:
md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0)
res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
(md.some, res)
)
proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} =
if cid.isEmpty:
return success(DeleteResult(kind: InUse))
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} =
var
maybeMeta: ?BlockMetadata
res: DeleteResult
if currMd =? maybeCurrMd:
if currMd.refCount == 0 or currMd.expiry < expiryLimit:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: Deleted, released: currMd.size)
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
else:
maybeMeta = currMd.some
res = DeleteResult(kind: InUse)
else:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: NotFound)
# making sure that the block acutally is removed from the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if hasBlock:
warn "Block metadata is absent, but block is present. Removing block.", cid
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
(maybeMeta, res)
)

View File

@ -0,0 +1,395 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/[cid, multicodec]
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ./operations
import ../blockstore
import ../keyutils
import ../queryiterhelper
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
import ../../utils
export blocktype, cid
logScope:
topics = "codex repostore"
###########################################################
# BlockStore API
###########################################################
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
without blk =? await self.getBlock(leafMd.blkCid), err:
return failure(err)
success((blk, leafMd.proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.getBlock(leafMd.blkCid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
await self.updateBlockMetadata(cid, minExpiry = expiry)
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blkCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
treeCid = treeCid
index = index
blkCid = blkCid
trace "Storing LeafMetadata"
without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err:
return failure(err)
if blkCid.mcodec == BlockCodec:
if res == Stored:
if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption:
return failure(err)
trace "Leaf metadata stored, block refCount incremented"
else:
trace "Leaf metadata already exists"
return success()
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!(Cid, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success((leafMd.blkCid, leafMd.proof))
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!Cid] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success(leafMd.blkCid)
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
without res =? await self.storeBlock(blk, expiry), err:
return failure(err)
if res.kind == Stored:
trace "Block Stored"
if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# rollback changes
without delRes =? await self.tryDeleteBlock(blk.cid), err:
return failure(err)
return failure(err)
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err)
else:
trace "Block already exists"
return success()
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore when block refCount is 0 or block is expired
##
logScope:
cid = cid
trace "Attempting to delete a block"
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err:
return failure(err)
if res.kind == Deleted:
trace "Block deleted"
if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption:
return failure(err)
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
return failure(err)
elif res.kind == InUse:
trace "Block in use, refCount > 0 and not expired"
else:
trace "Block not found in store"
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success()
else:
return failure(err)
if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption:
if not (err of BlockNotFoundError):
return failure(err)
await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(leafMd.blkCid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
## Get iterator with block expirations
##
without beQuery =? createBlockExpirationQuery(maxNumber, offset), err:
error "Unable to format block expirations query", err = err.msg
return failure(err)
without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter,
proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
###########################################################
# RepoStore procs
###########################################################
proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", bytes
await self.updateQuotaUsage(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", bytes
await self.updateQuotaUsage(minusReserved = bytes)
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting rep"
if err =? (await self.updateTotalBlocksCount()).errorOption:
raise newException(CodexError, err.msg)
if err =? (await self.updateQuotaUsage()).errorOption:
raise newException(CodexError, err.msg)
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false

View File

@ -0,0 +1,107 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import ../blockstore
import ../../clock
import ../../errors
import ../../merkletree
import ../../systemclock
import ../../units
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 8.GiBs
type
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage
totalBlocks*: Natural
blockTtl*: Duration
started*: bool
QuotaUsage* {.serialize.} = object
used*: NBytes
reserved*: NBytes
BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
size*: NBytes
refCount*: Natural
LeafMetadata* {.serialize.} = object
blkCid*: Cid
proof*: CodexProof
BlockExpiration* {.serialize.} = object
cid*: Cid
expiry*: SecondsSince1970
DeleteResultKind* {.serialize.} = enum
Deleted = 0, # block removed from store
InUse = 1, # block not removed, refCount > 0 and not expired
NotFound = 2 # block not found in store
DeleteResult* {.serialize.} = object
kind*: DeleteResultKind
released*: NBytes
StoreResultKind* {.serialize.} = enum
Stored = 0, # new block stored
AlreadyInStore = 1 # block already in store
StoreResult* {.serialize.} = object
kind*: StoreResultKind
used*: NBytes
func quotaUsedBytes*(self: RepoStore): NBytes =
self.quotaUsage.used
func quotaReservedBytes*(self: RepoStore): NBytes =
self.quotaUsage.reserved
func totalUsed*(self: RepoStore): NBytes =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): NBytes =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: NBytes): bool =
return bytes < self.available()
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: TypedDatastore.init(metaDs),
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)

View File

@ -38,7 +38,6 @@ type
StoreStream* = ref object of SeekableStream StoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
method initStream*(s: StoreStream) = method initStream*(s: StoreStream) =
if s.objName.len == 0: if s.objName.len == 0:
@ -57,13 +56,15 @@ proc new*(
result = StoreStream( result = StoreStream(
store: store, store: store,
manifest: manifest, manifest: manifest,
pad: pad,
offset: 0) offset: 0)
result.initStream() result.initStream()
method `size`*(self: StoreStream): int = method `size`*(self: StoreStream): int =
bytes(self.manifest, self.pad).int ## The size of a StoreStream is the size of the original dataset, without
## padding or parity blocks.
let m = self.manifest
(if m.protected: m.originalDatasetSize else: m.datasetSize).int
proc `size=`*(self: StoreStream, size: int) proc `size=`*(self: StoreStream, size: int)
{.error: "Setting the size is forbidden".} = {.error: "Setting the size is forbidden".} =

View File

@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes
logutils.formatIt(NBytes): $it logutils.formatIt(NBytes): $it
const const
MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz
MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz
GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz
proc KiBs*(v: Natural): NBytes = v.NBytes * KiB
proc MiBs*(v: Natural): NBytes = v.NBytes * MiB proc MiBs*(v: Natural): NBytes = v.NBytes * MiB
proc GiBs*(v: Natural): NBytes = v.NBytes * GiB
func divUp*[T: NBytes](a, b : T): int = func divUp*[T: NBytes](a, b : T): int =
## Division with result rounded up (rather than truncated as in 'div') ## Division with result rounded up (rather than truncated as in 'div')

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import std/sequtils import std/sequtils
import std/sugar
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/questionable import pkg/questionable
@ -24,33 +25,28 @@ type
testBlockExpirations*: seq[BlockExpiration] testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool getBlockExpirationsThrows*: bool
iteratorIndex: int
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =
self.delBlockCids.add(cid) self.delBlockCids.add(cid)
self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid)
dec self.iteratorIndex
return success() return success()
method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} =
if self.getBlockExpirationsThrows: if self.getBlockExpirationsThrows:
raise new CatchableError raise new CatchableError
self.getBeMaxNumber = maxNumber self.getBeMaxNumber = maxNumber
self.getBeOffset = offset self.getBeOffset = offset
var iter = AsyncIter[?BlockExpiration]() let
testBlockExpirationsCpy = @(self.testBlockExpirations)
limit = min(offset + maxNumber, len(testBlockExpirationsCpy))
self.iteratorIndex = offset let
var numberLeft = maxNumber iter1 = AsyncIter[int].new(offset..<limit)
proc next(): Future[?BlockExpiration] {.async.} = iter2 = map[int, BlockExpiration](iter1,
if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): proc (i: int): Future[BlockExpiration] {.async.} =
dec numberLeft testBlockExpirationsCpy[i]
let selectedBlock = self.testBlockExpirations[self.iteratorIndex] )
inc self.iteratorIndex
return selectedBlock.some
iter.finish
return BlockExpiration.none
iter.next = next success(iter2)
return success iter

View File

@ -0,0 +1,33 @@
import pkg/chronos
import pkg/codex/sales
import pkg/codex/stores
import pkg/questionable/results
type
MockReservations* = ref object of Reservations
createReservationThrowBytesOutOfBoundsError: bool
proc new*(
T: type MockReservations,
repo: RepoStore
): MockReservations =
## Create a mock clock instance
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) =
self.createReservationThrowBytesOutOfBoundsError = flag
method createReservation*(
self: MockReservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256): Future[?!Reservation] {.async.} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)

View File

@ -5,7 +5,6 @@ import std/cpuinfo
import pkg/libp2p import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/taskpools import pkg/taskpools
import pkg/codex/codextypes import pkg/codex/codextypes
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/stores import pkg/codex/stores

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint
@ -32,6 +33,7 @@ import pkg/codex/discovery
import pkg/codex/erasure import pkg/codex/erasure
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore/coders
import pkg/codex/utils/asynciter import pkg/codex/utils/asynciter
import pkg/codex/indexingstrategy import pkg/codex/indexingstrategy
@ -110,10 +112,11 @@ asyncchecksuite "Test Node - Host contracts":
for index in 0..<manifest.blocksCount: for index in 0..<manifest.blocksCount:
let let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey) bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry check blkMd.expiry == expectedExpiry
test "onStore callback is set": test "onStore callback is set":
check sales.onStore.isSome check sales.onStore.isSome
@ -131,7 +134,7 @@ asyncchecksuite "Test Node - Host contracts":
return success() return success()
(await onStore(request, 1.u256, onBlocks)).tryGet() (await onStore(request, 1.u256, onBlocks)).tryGet()
check fetchedBytes == 262144 check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init( let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots) 0, verifiable.numSlotBlocks() - 1, verifiable.numSlots)
@ -139,7 +142,8 @@ asyncchecksuite "Test Node - Host contracts":
for index in indexer.getIndicies(1): for index in indexer.getIndicies(1):
let let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey) bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970 check blkMd.expiry == request.expiry.toSecondsSince1970

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint

View File

@ -1,20 +1,66 @@
import std/unittest import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/datastore
import pkg/stew/byteutils
import pkg/codex/contracts/requests import pkg/codex/contracts/requests
import pkg/codex/sales/states/preparing
import pkg/codex/sales/states/downloading import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled import pkg/codex/sales/states/filled
import pkg/codex/sales/states/ignored
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import ../../../asynctest
import ../../helpers
import ../../examples import ../../examples
import ../../helpers/mockmarket
import ../../helpers/mockreservations
import ../../helpers/mockclock
suite "sales state 'preparing'": asyncchecksuite "sales state 'preparing'":
let request = StorageRequest.example let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256 let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var agent: SalesAgent
var state: SalePreparing var state: SalePreparing
var repo: RepoStore
var availability: Availability
var context: SalesContext
var reservations: MockReservations
setup: setup:
availability = Availability(
totalSize: request.ask.slotSize + 100.u256,
freeSize: request.ask.slotSize + 100.u256,
duration: request.ask.duration + 60.u256,
minPrice: request.ask.pricePerSlot - 10.u256,
maxCollateral: request.ask.collateral + 400.u256
)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
state = SalePreparing.new() state = SalePreparing.new()
context = SalesContext(
market: market,
clock: clock
)
reservations = MockReservations.new(repo)
context.reservations = reservations
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
teardown:
await repo.stop()
test "switches to cancelled state when request expires": test "switches to cancelled state when request expires":
let next = state.onCancelled(request) let next = state.onCancelled(request)
@ -27,3 +73,28 @@ suite "sales state 'preparing'":
test "switches to filled state when slot is filled": test "switches to filled state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex) let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled check !next of SaleFilled
proc createAvailability() {.async.} =
let a = await reservations.createAvailability(
availability.totalSize,
availability.duration,
availability.minPrice,
availability.maxCollateral
)
availability = a.get
test "run switches to ignored when no availability":
let next = await state.run(agent)
check !next of SaleIgnored
test "run switches to downloading when reserved":
await createAvailability()
let next = await state.run(agent)
check !next of SaleDownloading
test "run switches to ignored when reserve fails with BytesOutOfBounds":
await createAvailability()
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
let next = await state.run(agent)
check !next of SaleIgnored

View File

@ -1,4 +1,5 @@
import std/random import std/random
import std/sequtils
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -6,6 +7,7 @@ import pkg/chronos
import pkg/datastore import pkg/datastore
import pkg/codex/stores import pkg/codex/stores
import pkg/codex/errors
import pkg/codex/sales import pkg/codex/sales
import pkg/codex/utils/json import pkg/codex/utils/json
@ -13,6 +15,8 @@ import ../../asynctest
import ../examples import ../examples
import ../helpers import ../helpers
const CONCURRENCY_TESTS_COUNT = 1000
asyncchecksuite "Reservations module": asyncchecksuite "Reservations module":
var var
repo: RepoStore repo: RepoStore
@ -73,9 +77,9 @@ asyncchecksuite "Reservations module":
check availability.id != AvailabilityId.default check availability.id != AvailabilityId.default
test "creating availability reserves bytes in repo": test "creating availability reserves bytes in repo":
let orig = repo.available let orig = repo.available.uint
let availability = createAvailability() let availability = createAvailability()
check repo.available == (orig.u256 - availability.freeSize).truncate(uint) check repo.available.uint == (orig.u256 - availability.freeSize).truncate(uint)
test "can get all availabilities": test "can get all availabilities":
let availability1 = createAvailability() let availability1 = createAvailability()
@ -148,6 +152,39 @@ asyncchecksuite "Reservations module":
check created.isErr check created.isErr
check created.error of BytesOutOfBoundsError check created.error of BytesOutOfBoundsError
test "cannot create reservation larger than availability size - concurrency test":
proc concurrencyTest(): Future[void] {.async.} =
let availability = createAvailability()
let one = reservations.createReservation(
availability.id,
availability.totalSize - 1,
RequestId.example,
UInt256.example
)
let two = reservations.createReservation(
availability.id,
availability.totalSize,
RequestId.example,
UInt256.example
)
let oneResult = await one
let twoResult = await two
check oneResult.isErr or twoResult.isErr
if oneResult.isErr:
check oneResult.error of BytesOutOfBoundsError
if twoResult.isErr:
check twoResult.error of BytesOutOfBoundsError
var futures: seq[Future[void]]
for _ in 1..CONCURRENCY_TESTS_COUNT:
futures.add(concurrencyTest())
await allFuturesThrowing(futures)
test "creating reservation reduces availability size": test "creating reservation reduces availability size":
let availability = createAvailability() let availability = createAvailability()
let orig = availability.freeSize let orig = availability.freeSize
@ -211,7 +248,7 @@ asyncchecksuite "Reservations module":
check updated.freeSize > orig check updated.freeSize > orig
check (updated.freeSize - orig) == 200.u256 check (updated.freeSize - orig) == 200.u256
check (repo.quotaReservedBytes - origQuota) == 200 check (repo.quotaReservedBytes - origQuota) == 200.NBytes
test "update releases quota when lowering size": test "update releases quota when lowering size":
let let
@ -220,7 +257,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize - 100 availability.totalSize = availability.totalSize - 100
check isOk await reservations.update(availability) check isOk await reservations.update(availability)
check (origQuota - repo.quotaReservedBytes) == 100 check (origQuota - repo.quotaReservedBytes) == 100.NBytes
test "update reserves quota when growing size": test "update reserves quota when growing size":
let let
@ -229,7 +266,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize + 100 availability.totalSize = availability.totalSize + 100
check isOk await reservations.update(availability) check isOk await reservations.update(availability)
check (repo.quotaReservedBytes - origQuota) == 100 check (repo.quotaReservedBytes - origQuota) == 100.NBytes
test "reservation can be partially released": test "reservation can be partially released":
let availability = createAvailability() let availability = createAvailability()
@ -333,17 +370,17 @@ asyncchecksuite "Reservations module":
check got.error of NotExistsError check got.error of NotExistsError
test "can get available bytes in repo": test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes check reservations.available == DefaultQuotaBytes.uint
test "reports quota available to be reserved": test "reports quota available to be reserved":
check reservations.hasAvailable(DefaultQuotaBytes - 1) check reservations.hasAvailable(DefaultQuotaBytes.uint - 1)
test "reports quota not available to be reserved": test "reports quota not available to be reserved":
check not reservations.hasAvailable(DefaultQuotaBytes + 1) check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1)
test "fails to create availability with size that is larger than available quota": test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability( let created = await reservations.createAvailability(
(DefaultQuotaBytes + 1).u256, (DefaultQuotaBytes.uint + 1).u256,
UInt256.example, UInt256.example,
UInt256.example, UInt256.example,
UInt256.example UInt256.example

View File

@ -2,7 +2,7 @@ import std/sequtils
import std/sugar import std/sugar
import std/times import std/times
import pkg/chronos import pkg/chronos
import pkg/datastore import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/codex/sales import pkg/codex/sales

View File

@ -6,5 +6,9 @@ import ./states/testinitialproving
import ./states/testfilled import ./states/testfilled
import ./states/testproving import ./states/testproving
import ./states/testsimulatedproving import ./states/testsimulatedproving
import ./states/testcancelled
import ./states/testerrored
import ./states/testignored
import ./states/testpreparing
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}

View File

@ -0,0 +1,71 @@
import std/unittest
import std/random
import pkg/stew/objects
import pkg/questionable
import pkg/questionable/results
import pkg/codex/clock
import pkg/codex/stores/repostore/types
import pkg/codex/stores/repostore/coders
import ../../helpers
checksuite "Test coders":
proc rand(T: type NBytes): T =
rand(Natural).NBytes
proc rand(E: type[enum]): E =
let ordinals = enumRangeInt64(E)
E(ordinals[rand(ordinals.len - 1)])
proc rand(T: type QuotaUsage): T =
QuotaUsage(
used: rand(NBytes),
reserved: rand(NBytes)
)
proc rand(T: type BlockMetadata): T =
BlockMetadata(
expiry: rand(SecondsSince1970),
size: rand(NBytes),
refCount: rand(Natural)
)
proc rand(T: type DeleteResult): T =
DeleteResult(
kind: rand(DeleteResultKind),
released: rand(NBytes)
)
proc rand(T: type StoreResult): T =
StoreResult(
kind: rand(StoreResultKind),
used: rand(NBytes)
)
test "Natural encode/decode":
for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]:
check:
success(val) == Natural.decode(encode(val))
test "QuotaUsage encode/decode":
for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)):
check:
success(val) == QuotaUsage.decode(encode(val))
test "BlockMetadata encode/decode":
for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)):
check:
success(val) == BlockMetadata.decode(encode(val))
test "DeleteResult encode/decode":
for val in newSeqWith[DeleteResult](100, rand(DeleteResult)):
check:
success(val) == DeleteResult.decode(encode(val))
test "StoreResult encode/decode":
for val in newSeqWith[StoreResult](100, rand(StoreResult)):
check:
success(val) == StoreResult.decode(encode(val))

View File

@ -34,10 +34,10 @@ checksuite "BlockMaintainer":
var testBe2: BlockExpiration var testBe2: BlockExpiration
var testBe3: BlockExpiration var testBe3: BlockExpiration
proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration =
BlockExpiration( BlockExpiration(
cid: bt.Block.example.cid, cid: bt.Block.example.cid,
expiration: expiration expiry: expiry
) )
setup: setup:
@ -186,4 +186,3 @@ checksuite "BlockMaintainer":
await invokeTimerManyTimes() await invokeTimerManyTimes()
# Second new block has expired # Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]

View File

@ -23,6 +23,8 @@ import ../helpers/mockclock
import ../examples import ../examples
import ./commonstoretests import ./commonstoretests
import ./repostore/testcoders
checksuite "Test RepoStore start/stop": checksuite "Test RepoStore start/stop":
var var
@ -34,24 +36,24 @@ checksuite "Test RepoStore start/stop":
metaDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet()
test "Should set started flag once started": test "Should set started flag once started":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
check repo.started check repo.started
test "Should set started flag to false once stopped": test "Should set started flag to false once stopped":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
await repo.stop() await repo.stop()
check not repo.started check not repo.started
test "Should allow start to be called multiple times": test "Should allow start to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start() await repo.start()
await repo.start() await repo.start()
check repo.started check repo.started
test "Should allow stop to be called multiple times": test "Should allow stop to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.stop() await repo.stop()
await repo.stop() await repo.stop()
check not repo.started check not repo.started
@ -73,7 +75,7 @@ asyncchecksuite "RepoStore":
mockClock = MockClock.new() mockClock = MockClock.new()
mockClock.set(now) mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200) repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb)
teardown: teardown:
(await repoDs.close()).tryGet (await repoDs.close()).tryGet
@ -85,117 +87,107 @@ asyncchecksuite "RepoStore":
test "Should update current used bytes on block put": test "Should update current used bytes on block put":
let blk = createTestBlock(200) let blk = createTestBlock(200)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check: check:
repo.quotaUsedBytes == 200 repo.quotaUsedBytes == 200'nb
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
test "Should update current used bytes on block delete": test "Should update current used bytes on block delete":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
(await repo.delBlock(blk.cid)).tryGet (await repo.delBlock(blk.cid)).tryGet
check: check:
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
test "Should not update current used bytes if block exist": test "Should not update current used bytes if block exist":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0 check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
# put again # put again
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100 check repo.quotaUsedBytes == 100'nb
check:
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
test "Should fail storing passed the quota": test "Should fail storing passed the quota":
let blk = createTestBlock(300) let blk = createTestBlock(300)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
expect QuotaUsedError: expect QuotaNotEnoughError:
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
test "Should reserve bytes": test "Should reserve bytes":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check: check:
repo.totalUsed == 200 repo.totalUsed == 200'nb
repo.quotaUsedBytes == 100 repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 100 repo.quotaReservedBytes == 100'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
test "Should not reserve bytes over max quota": test "Should not reserve bytes over max quota":
let blk = createTestBlock(100) let blk = createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
expect QuotaNotEnoughError: expect QuotaNotEnoughError:
(await repo.reserve(101)).tryGet (await repo.reserve(101'nb)).tryGet
check: check:
repo.totalUsed == 100 repo.totalUsed == 100'nb
repo.quotaUsedBytes == 100 repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 0 repo.quotaReservedBytes == 0'nb
expect DatastoreKeyNotFound:
discard (await metaDs.get(QuotaReservedKey)).tryGet
test "Should release bytes": test "Should release bytes":
discard createTestBlock(100) discard createTestBlock(100)
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
(await repo.release(100)).tryGet (await repo.release(100'nb)).tryGet
check: check:
repo.totalUsed == 0 repo.totalUsed == 0'nb
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 0 repo.quotaReservedBytes == 0'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
test "Should not release bytes less than quota": test "Should not release bytes less than quota":
check repo.totalUsed == 0 check repo.totalUsed == 0'nb
(await repo.reserve(100)).tryGet (await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100 check repo.totalUsed == 100'nb
expect CatchableError: expect RangeDefect:
(await repo.release(101)).tryGet (await repo.release(101'nb)).tryGet
check: check:
repo.totalUsed == 100 repo.totalUsed == 100'nb
repo.quotaUsedBytes == 0 repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 100 repo.quotaReservedBytes == 100'nb
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = proc getExpirations(): Future[seq[BlockExpiration]] {.async.} =
let let iter = (await repo.getBlockExpirations(100, 0)).tryGet()
query = Query.init(key)
responseIter = (await metaDs.query(query)).tryGet var res = newSeq[BlockExpiration]()
response = (await allFinished(toSeq(responseIter))) for fut in iter:
.mapIt(it.read.tryGet) let be = await fut
.filterIt(it.key.isSome) res.add(be)
return response
res
test "Should store block expiration timestamp": test "Should store block expiration timestamp":
let let
@ -203,49 +195,40 @@ asyncchecksuite "RepoStore":
blk = createTestBlock(100) blk = createTestBlock(100)
let let
expectedExpiration: SecondsSince1970 = 123 + 10 expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, duration.some)).tryGet (await repo.putBlock(blk, duration.some)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
test "Should store block with default expiration timestamp when not provided": test "Should store block with default expiration timestamp when not provided":
let let
blk = createTestBlock(100) blk = createTestBlock(100)
let let
expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk)).tryGet (await repo.putBlock(blk)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
test "Should refuse update expiry with negative timestamp": test "Should refuse update expiry with negative timestamp":
let let
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + 10 expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some 10.seconds)).tryGet (await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expect ValueError: expect ValueError:
(await repo.ensureExpiry(blk.cid, -1)).tryGet (await repo.ensureExpiry(blk.cid, -1)).tryGet
@ -262,56 +245,45 @@ asyncchecksuite "RepoStore":
test "Should update block expiration timestamp when new expiration is farther": test "Should update block expiration timestamp when new expiration is farther":
let let
duration = 10
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some duration.seconds)).tryGet (await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet (await repo.ensureExpiry(blk.cid, now + 20)).tryGet
response = await queryMetaDs(expectedKey) let updatedExpirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration notin updatedExpirations
!response[0].key == expectedKey updatedExpectedExpiration in updatedExpirations
response[0].data == updatedExpectedExpiration.toBytes
test "Should not update block expiration timestamp when current expiration is farther then new one": test "Should not update block expiration timestamp when current expiration is farther then new one":
let let
duration = 10
blk = createTestBlock(100) blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5)
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
(await repo.putBlock(blk, some 10.seconds)).tryGet
(await repo.putBlock(blk, some duration.seconds)).tryGet let expirations = await getExpirations()
var response = await queryMetaDs(expectedKey)
check: check:
response.len == 1 expectedExpiration in expirations
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet (await repo.ensureExpiry(blk.cid, now + 5)).tryGet
response = await queryMetaDs(expectedKey) let updatedExpirations = await getExpirations()
check: check:
response.len == 1 expectedExpiration in updatedExpirations
!response[0].key == expectedKey updatedExpectedExpiration notin updatedExpirations
response[0].data == expectedExpiration.toBytes
test "delBlock should remove expiration metadata": test "delBlock should remove expiration metadata":
let let
@ -321,18 +293,18 @@ asyncchecksuite "RepoStore":
(await repo.putBlock(blk, 10.seconds.some)).tryGet (await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet (await repo.delBlock(blk.cid)).tryGet
let response = await queryMetaDs(expectedKey) let expirations = await getExpirations()
check: check:
response.len == 0 expirations.len == 0
test "Should retrieve block expiration information": test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0) var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err: without iter =? (await beIter), err:
return expirations return expirations
for be in toSeq(iter): for beFut in toSeq(iter):
if value =? (await be): let value = await beFut
expirations.add(value) expirations.add(value)
return expirations return expirations
@ -343,12 +315,12 @@ asyncchecksuite "RepoStore":
blk3 = createTestBlock(12) blk3 = createTestBlock(12)
let let
expectedExpiration: SecondsSince1970 = 123 + 10 expectedExpiration: SecondsSince1970 = now + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check: check:
be.cid == expectedBlock.cid be.cid == expectedBlock.cid
be.expiration == expectedExpiration be.expiry == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet (await repo.putBlock(blk1, duration.some)).tryGet

View File

@ -58,6 +58,14 @@ suite "Indexing strategies":
expect IndexingWrongIterationsError: expect IndexingWrongIterationsError:
discard LinearStrategy.init(0, 10, 0) discard LinearStrategy.init(0, 10, 0)
test "should split elements evenly when possible":
let
l = LinearStrategy.init(0, 11, 3)
check:
toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it)
test "linear - oob": test "linear - oob":
expect IndexingError: expect IndexingError:
discard linear.getIndicies(3) discard linear.getIndicies(3)

View File

@ -74,3 +74,36 @@ checksuite "Manifest":
test "Should encode/decode to/from verifiable manifest": test "Should encode/decode to/from verifiable manifest":
check: check:
encodeDecode(verifiableManifest) == verifiableManifest encodeDecode(verifiableManifest) == verifiableManifest
suite "Manifest - Attribute Inheritance":
proc makeProtectedManifest(strategy: StrategyType): Manifest =
Manifest.new(
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 1.MiBs,
datasetSize = 100.MiBs,
),
treeCid = Cid.example,
datasetSize = 200.MiBs,
ecK = 1,
ecM = 1,
strategy = strategy
)
test "Should preserve interleaving strategy for protected manifest in verifiable manifest":
var verifiable = Manifest.new(
manifest = makeProtectedManifest(SteppedStrategy),
verifyRoot = Cid.example,
slotRoots = @[Cid.example, Cid.example]
).tryGet()
check verifiable.protectedStrategy == SteppedStrategy
verifiable = Manifest.new(
manifest = makeProtectedManifest(LinearStrategy),
verifyRoot = Cid.example,
slotRoots = @[Cid.example, Cid.example]
).tryGet()
check verifiable.protectedStrategy == LinearStrategy

View File

@ -1,12 +1,15 @@
import pkg/chronos import pkg/chronos
import pkg/questionable/results import pkg/questionable/results
import pkg/codex/streams import pkg/codex/[
import pkg/codex/stores streams,
import pkg/codex/manifest stores,
import pkg/codex/blocktype as bt indexingstrategy,
manifest,
blocktype as bt]
import ../asynctest import ../asynctest
import ./examples
import ./helpers import ./helpers
asyncchecksuite "StoreStream": asyncchecksuite "StoreStream":
@ -99,3 +102,40 @@ asyncchecksuite "StoreStream":
await stream.readExactly(addr buf[0], 15) await stream.readExactly(addr buf[0], 15)
check sequentialBytes(buf,15,0) check sequentialBytes(buf,15,0)
suite "StoreStream - Size Tests":
var stream: StoreStream
teardown:
await stream.close()
test "Should return dataset size as stream size":
let manifest = Manifest.new(
treeCid = Cid.example,
datasetSize = 80.NBytes,
blockSize = 10.NBytes
)
stream = StoreStream.new(CacheStore.new(), manifest)
check stream.size == 80
test "Should not count parity/padding bytes as part of stream size":
let protectedManifest = Manifest.new(
treeCid = Cid.example,
datasetSize = 120.NBytes, # size including parity bytes
blockSize = 10.NBytes,
version = CIDv1,
hcodec = Sha256HashCodec,
codec = BlockCodec,
ecK = 2,
ecM = 1,
originalTreeCid = Cid.example,
originalDatasetSize = 80.NBytes, # size without parity bytes
strategy = StrategyType.SteppedStrategy
)
stream = StoreStream.new(CacheStore.new(), protectedManifest)
check stream.size == 80

View File

@ -1,5 +1,6 @@
import std/sequtils import std/sequtils
from pkg/libp2p import `==` from pkg/libp2p import `==`
import pkg/codex/units
import ./twonodes import ./twonodes
twonodessuite "REST API", debug1 = false, debug2 = false: twonodessuite "REST API", debug1 = false, debug2 = false:
@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false:
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet() let space = client1.space().tryGet()
check: check:
space.totalBlocks == 2.uint space.totalBlocks == 2
space.quotaMaxBytes == 8589934592.uint space.quotaMaxBytes == 8589934592.NBytes
space.quotaUsedBytes == 65592.uint space.quotaUsedBytes == 65592.NBytes
space.quotaReservedBytes == 12.uint space.quotaReservedBytes == 12.NBytes
test "node lists local files": test "node lists local files":
let content1 = "some file contents" let content1 = "some file contents"