Merge branch 'master' into testapi-delete

# Conflicts:
#	codex/rest/api.nim
This commit is contained in:
Ben 2024-06-24 10:06:40 +02:00
commit d2f207ca3d
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
42 changed files with 1455 additions and 1710 deletions

View File

@ -9,7 +9,7 @@ on:
env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: v1.6.14
nim_version: pinned
concurrency:

View File

@ -6,7 +6,7 @@ on:
env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: v1.6.14, v1.6.16, v1.6.18
nim_version: pinned, v1.6.16, v1.6.18
jobs:
matrix:

View File

@ -5,6 +5,30 @@
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
# This is the Nim version used locally and in regular CI builds.
# Can be a specific version tag, a branch name, or a commit hash.
# Can be overridden by setting the NIM_COMMIT environment variable
# before calling make.
#
# For readability in CI, if NIM_COMMIT is set to "pinned",
# this will also default to the version pinned here.
#
# If NIM_COMMIT is set to "nimbusbuild", this will use the
# version pinned by nimbus-build-system.
PINNED_NIM_VERSION := v1.6.14
ifeq ($(NIM_COMMIT),)
NIM_COMMIT := $(PINNED_NIM_VERSION)
else ifeq ($(NIM_COMMIT),pinned)
NIM_COMMIT := $(PINNED_NIM_VERSION)
endif
ifeq ($(NIM_COMMIT),nimbusbuild)
undefine NIM_COMMIT
else
export NIM_COMMIT
endif
SHELL := bash # the shell used internally by Make
# used inside the included makefiles

View File

@ -1,209 +0,0 @@
{
"clangVersion": "",
"gccVersion": "",
"hostCPU": "arm64",
"hostOS": "macosx",
"items": {
"asynctest": {
"commit": "8e2f4e73b97123be0f0041c129942b32df23ecb1",
"dir": "vendor/asynctest",
"url": "https://github.com/codex-storage/asynctest"
},
"dnsclient.nim": {
"commit": "23214235d4784d24aceed99bbfe153379ea557c8",
"dir": "vendor/dnsclient.nim",
"url": "https://github.com/ba0f3/dnsclient.nim"
},
"lrucache.nim": {
"commit": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd",
"dir": "vendor/lrucache.nim",
"url": "https://github.com/status-im/lrucache.nim"
},
"nim-bearssl": {
"commit": "99fcb3405c55b27cfffbf60f5368c55da7346f23",
"dir": "vendor/nim-bearssl",
"url": "https://github.com/status-im/nim-bearssl"
},
"nim-blscurve": {
"commit": "48d8668c5a9a350d3a7ee0c3713ef9a11980a40d",
"dir": "vendor/nim-blscurve",
"url": "https://github.com/status-im/nim-blscurve"
},
"nim-chronicles": {
"commit": "c9c8e58ec3f89b655a046c485f622f9021c68b61",
"dir": "vendor/nim-chronicles",
"url": "https://github.com/status-im/nim-chronicles"
},
"nim-chronos": {
"commit": "0277b65be2c7a365ac13df002fba6e172be55537",
"dir": "vendor/nim-chronos",
"url": "https://github.com/status-im/nim-chronos"
},
"nim-confutils": {
"commit": "2028b41602b3abf7c9bf450744efde7b296707a2",
"dir": "vendor/nim-confutils",
"url": "https://github.com/status-im/nim-confutils"
},
"nim-contract-abi": {
"commit": "61f8f59b3917d8e27c6eb4330a6d8cf428e98b2d",
"dir": "vendor/nim-contract-abi",
"url": "https://github.com/status-im/nim-contract-abi"
},
"nim-datastore": {
"commit": "0cde8aeb67c59fd0ac95496dc6b5e1168d6632aa",
"dir": "vendor/nim-datastore",
"url": "https://github.com/codex-storage/nim-datastore"
},
"nim-faststreams": {
"commit": "720fc5e5c8e428d9d0af618e1e27c44b42350309",
"dir": "vendor/nim-faststreams",
"url": "https://github.com/status-im/nim-faststreams"
},
"nim-http-utils": {
"commit": "3b491a40c60aad9e8d3407443f46f62511e63b18",
"dir": "vendor/nim-http-utils",
"url": "https://github.com/status-im/nim-http-utils"
},
"nim-json-rpc": {
"commit": "0bf2bcbe74a18a3c7a709d57108bb7b51e748a92",
"dir": "vendor/nim-json-rpc",
"url": "https://github.com/status-im/nim-json-rpc"
},
"nim-json-serialization": {
"commit": "bb53d49caf2a6c6cf1df365ba84af93cdcfa7aa3",
"dir": "vendor/nim-json-serialization",
"url": "https://github.com/status-im/nim-json-serialization"
},
"nim-leopard": {
"commit": "1a6f2ab7252426a6ac01482a68b75d0c3b134cf0",
"dir": "vendor/nim-leopard",
"url": "https://github.com/status-im/nim-leopard"
},
"nim-libbacktrace": {
"commit": "b29c22ba0ef13de50b779c776830dbea1d50cd33",
"dir": "vendor/nim-libbacktrace",
"url": "https://github.com/status-im/nim-libbacktrace"
},
"nim-libp2p": {
"commit": "440461b24b9e66542b34d26a0b908c17f6549d05",
"dir": "vendor/nim-libp2p",
"url": "https://github.com/status-im/nim-libp2p"
},
"nim-libp2p-dht": {
"commit": "fdd02450aa6979add7dabd29a3ba0f8738bf89f8",
"dir": "vendor/nim-libp2p-dht",
"url": "https://github.com/status-im/nim-libp2p-dht"
},
"nim-metrics": {
"commit": "6142e433fc8ea9b73379770a788017ac528d46ff",
"dir": "vendor/nim-metrics",
"url": "https://github.com/status-im/nim-metrics"
},
"nim-nat-traversal": {
"commit": "27d314d65c9078924b3239fe4e2f5af0c512b28c",
"dir": "vendor/nim-nat-traversal",
"url": "https://github.com/status-im/nim-nat-traversal"
},
"nim-nitro": {
"commit": "6b4c455bf4dad7449c1580055733a1738fcd5aec",
"dir": "vendor/nim-nitro",
"url": "https://github.com/status-im/nim-nitro"
},
"nim-presto": {
"commit": "3984431dc0fc829eb668e12e57e90542b041d298",
"dir": "vendor/nim-presto",
"url": "https://github.com/status-im/nim-presto"
},
"nim-protobuf-serialization": {
"commit": "28214b3e40c755a9886d2ec8f261ec48fbb6bec6",
"dir": "vendor/nim-protobuf-serialization",
"url": "https://github.com/status-im/nim-protobuf-serialization"
},
"nim-results": {
"commit": "f3c666a272c69d70cb41e7245e7f6844797303ad",
"dir": "vendor/nim-results",
"url": "https://github.com/arnetheduck/nim-results"
},
"nim-secp256k1": {
"commit": "2acbbdcc0e63002a013fff49f015708522875832",
"dir": "vendor/nim-secp256k1",
"url": "https://github.com/status-im/nim-secp256k1"
},
"nim-serialization": {
"commit": "384eb2561ee755446cff512a8e057325848b86a7",
"dir": "vendor/nim-serialization",
"url": "https://github.com/status-im/nim-serialization"
},
"nim-sqlite3-abi": {
"commit": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3",
"dir": "vendor/nim-sqlite3-abi",
"url": "https://github.com/arnetheduck/nim-sqlite3-abi"
},
"nim-stew": {
"commit": "7afe7e3c070758cac1f628e4330109f3ef6fc853",
"dir": "vendor/nim-stew",
"url": "https://github.com/status-im/nim-stew"
},
"nim-taskpools": {
"commit": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a",
"dir": "vendor/nim-taskpools",
"url": "https://github.com/status-im/nim-taskpools"
},
"nim-testutils": {
"commit": "b56a5953e37fc5117bd6ea6dfa18418c5e112815",
"dir": "vendor/nim-testutils",
"url": "https://github.com/status-im/nim-testutils"
},
"nim-toml-serialization": {
"commit": "86d477136f105f04bfd0dd7c0e939593d81fc581",
"dir": "vendor/nim-toml-serialization",
"url": "https://github.com/status-im/nim-toml-serialization"
},
"nim-unittest2": {
"commit": "b178f47527074964f76c395ad0dfc81cf118f379",
"dir": "vendor/nim-unittest2",
"url": "https://github.com/status-im/nim-unittest2"
},
"nim-websock": {
"commit": "2c3ae3137f3c9cb48134285bd4a47186fa51f0e8",
"dir": "vendor/nim-websock",
"url": "https://github.com/status-im/nim-websock"
},
"nim-zlib": {
"commit": "f34ca261efd90f118dc1647beefd2f7a69b05d93",
"dir": "vendor/nim-zlib",
"url": "https://github.com/status-im/nim-zlib"
},
"nim-stint": {
"dir": "vendor/stint",
"url": "https://github.com/status-im/nim-stint",
"commit": "86621eced1dcfb5e25903019ebcfc76ed9128ec5"
},
"nimcrypto": {
"commit": "24e006df85927f64916e60511620583b11403178",
"dir": "vendor/nimcrypto",
"url": "https://github.com/status-im/nimcrypto"
},
"npeg": {
"commit": "b15a10e388b91b898c581dbbcb6a718d46b27d2f",
"dir": "vendor/npeg",
"url": "https://github.com/zevv/npeg"
},
"questionable": {
"commit": "47692e0d923ada8f7f731275b2a87614c0150987",
"dir": "vendor/questionable",
"url": "https://github.com/codex-storage/questionable"
},
"upraises": {
"commit": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2",
"dir": "vendor/upraises",
"url": "https://github.com/markspanbroek/upraises"
}
},
"nimVersion": "1.6.14",
"nimbleFile": {
"content": "# Package\n\nversion = \"0.3.2\"\nauthor = \"Status Research & Development GmbH\"\ndescription = \"DHT based on the libp2p Kademlia spec\"\nlicense = \"MIT\"\nskipDirs = @[\"tests\"]\n\n\n# Dependencies\nrequires \"nim >= 1.2.0\"\nrequires \"secp256k1#2acbbdcc0e63002a013fff49f015708522875832\" # >= 0.5.2 & < 0.6.0\nrequires \"protobuf_serialization\" # >= 0.2.0 & < 0.3.0\nrequires \"nimcrypto == 0.5.4\"\nrequires \"bearssl#head\"\nrequires \"chronicles >= 0.10.2 & < 0.11.0\"\nrequires \"chronos == 3.2.0\" # >= 3.0.11 & < 3.1.0\nrequires \"libp2p#unstable\"\nrequires \"metrics\"\nrequires \"stew#head\"\nrequires \"stint\"\nrequires \"asynctest >= 0.3.1 & < 0.4.0\"\nrequires \"https://github.com/codex-storage/nim-datastore#head\"\nrequires \"questionable\"\n\ninclude \"build.nims\"\n\n",
"filename": ""
},
"nimcfg": "############# begin Atlas config section ##########\n--noNimblePath\n--path:\"vendor/nim-secp256k1\"\n--path:\"vendor/nim-protobuf-serialization\"\n--path:\"vendor/nimcrypto\"\n--path:\"vendor/nim-bearssl\"\n--path:\"vendor/nim-chronicles\"\n--path:\"vendor/nim-chronos\"\n--path:\"vendor/nim-libp2p\"\n--path:\"vendor/nim-metrics\"\n--path:\"vendor/nim-stew\"\n--path:\"vendor/nim-stint\"\n--path:\"vendor/asynctest\"\n--path:\"vendor/nim-datastore\"\n--path:\"vendor/questionable\"\n--path:\"vendor/nim-faststreams\"\n--path:\"vendor/nim-serialization\"\n--path:\"vendor/npeg/src\"\n--path:\"vendor/nim-unittest2\"\n--path:\"vendor/nim-testutils\"\n--path:\"vendor/nim-json-serialization\"\n--path:\"vendor/nim-http-utils\"\n--path:\"vendor/dnsclient.nim/src\"\n--path:\"vendor/nim-websock\"\n--path:\"vendor/nim-results\"\n--path:\"vendor/nim-sqlite3-abi\"\n--path:\"vendor/upraises\"\n--path:\"vendor/nim-zlib\"\n############# end Atlas config section ##########\n"
}

View File

@ -6,31 +6,4 @@ binDir = "build"
srcDir = "."
installFiles = @["build.nims"]
requires "nim >= 1.2.0"
requires "asynctest >= 0.5.1 & < 0.6.0"
requires "bearssl >= 0.1.4"
requires "chronicles >= 0.7.2"
requires "chronos >= 2.5.2"
requires "confutils"
requires "ethers >= 0.7.3 & < 0.8.0"
requires "libbacktrace"
requires "libp2p"
requires "metrics"
requires "nimcrypto >= 0.4.1"
requires "nitro >= 0.5.1 & < 0.6.0"
requires "presto"
requires "protobuf_serialization >= 0.2.0 & < 0.3.0"
requires "questionable >= 0.10.13 & < 0.11.0"
requires "secp256k1"
requires "serde >= 1.0.0 & < 2.0.0"
requires "stew"
requires "upraises >= 0.1.0 & < 0.2.0"
requires "toml_serialization"
requires "https://github.com/status-im/lrucache.nim#1.2.2"
requires "leopard >= 0.1.0 & < 0.2.0"
requires "blscurve"
requires "libp2pdht"
requires "eth"
requires "https://github.com/codex-storage/nim-poseidon2.git >= 0.1.0 & < 0.2.0"
include "build.nims"

View File

@ -258,7 +258,7 @@ proc new*(
repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota.uint,
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)
maintenance = BlockMaintainer.new(
@ -312,7 +312,7 @@ proc new*(
taskpool = taskpool)
restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore),
codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
initTAddress(config.apiBindAddress , config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high)

View File

@ -191,6 +191,12 @@ type
name: "api-port"
abbr: "p" }: Port
apiCorsAllowedOrigin* {.
desc: "The REST Api CORS allowed origin for downloading data. '*' will allow all origins, '' will allow none.",
defaultValue: string.none
defaultValueDesc: "Disallow all cross origin requests to download data"
name: "api-cors-origin" }: Option[string]
repoKind* {.
desc: "Backend for main repo store (fs, sqlite, leveldb)"
defaultValueDesc: "fs"

View File

@ -49,8 +49,8 @@ func getLinearIndicies(
self.checkIteration(iteration)
let
first = self.firstIndex + iteration * (self.step + 1)
last = min(first + self.step, self.lastIndex)
first = self.firstIndex + iteration * self.step
last = min(first + self.step - 1, self.lastIndex)
getIter(first, last, 1)
@ -94,4 +94,4 @@ func init*(
firstIndex: firstIndex,
lastIndex: lastIndex,
iterations: iterations,
step: divUp((lastIndex - firstIndex), iterations))
step: divUp((lastIndex - firstIndex + 1), iterations))

View File

@ -135,13 +135,6 @@ func isManifest*(mc: MultiCodec): ?!bool =
# Various sizes and verification
############################################################
func bytes*(self: Manifest, pad = true): NBytes =
## Compute how many bytes corresponding StoreStream(Manifest, pad) will return
if pad or self.protected:
self.blocksCount.NBytes * self.blockSize
else:
self.datasetSize
func rounded*(self: Manifest): int =
## Number of data blocks in *protected* manifest including padding at the end
roundUp(self.originalBlocksCount, self.ecK)
@ -238,7 +231,7 @@ func new*(
treeCid: Cid,
datasetSize: NBytes,
ecK, ecM: int,
strategy: StrategyType): Manifest =
strategy = SteppedStrategy): Manifest =
## Create an erasure protected dataset from an
## unprotected one
##
@ -284,7 +277,7 @@ func new*(
ecM: int,
originalTreeCid: Cid,
originalDatasetSize: NBytes,
strategy: StrategyType): Manifest =
strategy = SteppedStrategy): Manifest =
Manifest(
treeCid: treeCid,
@ -306,7 +299,7 @@ func new*(
verifyRoot: Cid,
slotRoots: openArray[Cid],
cellSize = DefaultCellSize,
strategy = SteppedStrategy): ?!Manifest =
strategy = LinearStrategy): ?!Manifest =
## Create a verifiable dataset from an
## protected one
##
@ -331,6 +324,7 @@ func new*(
ecM: manifest.ecM,
originalTreeCid: manifest.treeCid,
originalDatasetSize: manifest.originalDatasetSize,
protectedStrategy: manifest.protectedStrategy,
verifiable: true,
verifyRoot: verifyRoot,
slotRoots: @slotRoots,

View File

@ -14,6 +14,8 @@ push: {.upraises: [].}
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/byteutils
import pkg/serde/json
import ../../units
import ../../errors
@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof =
nodes.add node
CodexProof.init(mcodec, index.int, nleaves.int, nodes)
proc fromJson*(
_: type CodexProof,
json: JsonNode
): ?!CodexProof =
expectJsonKind(Cid, JString, json)
var bytes: seq[byte]
try:
bytes = hexToSeqByte(json.str)
except ValueError as err:
return failure(err)
CodexProof.decode(bytes)
func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode())

View File

@ -560,7 +560,9 @@ proc onStore(
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)
without builder =? Poseidon2Builder.new(self.networkStore, manifest), err:
without builder =? Poseidon2Builder.new(
self.networkStore, manifest, manifest.verifiableStrategy
), err:
trace "Unable to create slots builder", err = err.msg
return failure(err)
@ -585,8 +587,8 @@ proc onStore(
return success()
without indexer =? manifest.protectedStrategy.init(
0, manifest.numSlotBlocks() - 1, manifest.numSlots).catch, err:
without indexer =? manifest.verifiableStrategy.init(
0, manifest.blocksCount - 1, manifest.numSlots).catch, err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg
return failure(err)

View File

@ -107,6 +107,8 @@ proc retrieveCid(
await stream.close()
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
router.rawApi(
MethodPost,
"/api/codex/v1/data") do (
@ -166,6 +168,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
Http400,
$cid.error())
if corsOrigin =? allowedOrigin:
resp.setHeader("Access-Control-Allow-Origin", corsOrigin)
resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS")
resp.setHeader("Access-Control-Headers", "X-Requested-With")
resp.setHeader("Access-Control-Max-Age", "86400")
await node.retrieveCid(cid.get(), local = true, resp=resp)
router.api(
@ -181,6 +189,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
Http400,
$cid.error())
if corsOrigin =? allowedOrigin:
resp.setHeader("Access-Control-Allow-Origin", corsOrigin)
resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS")
resp.setHeader("Access-Control-Headers", "X-Requested-With")
resp.setHeader("Access-Control-Max-Age", "86400")
await node.retrieveCid(cid.get(), local = false, resp=resp)
router.api(
@ -664,9 +678,13 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500)
proc initRestApi*(
node: CodexNodeRef,
conf: CodexConf,
repoStore: RepoStore,
corsAllowedOrigin: ?string): RestRouter =
proc initRestApi*(node: CodexNodeRef, conf: CodexConf, repoStore: RepoStore): RestRouter =
var router = RestRouter.init(validate)
var router = RestRouter.init(validate, corsAllowedOrigin)
initDataApi(node, repoStore, router)
initSalesApi(node, router)

View File

@ -7,6 +7,7 @@ import ../sales
import ../purchasing
import ../utils/json
import ../manifest
import ../units
export json
@ -65,10 +66,10 @@ type
id*: NodeId
RestRepoStore* = object
totalBlocks* {.serialize.}: uint
quotaMaxBytes* {.serialize.}: uint
quotaUsedBytes* {.serialize.}: uint
quotaReservedBytes* {.serialize.}: uint
totalBlocks* {.serialize.}: Natural
quotaMaxBytes* {.serialize.}: NBytes
quotaUsedBytes* {.serialize.}: NBytes
quotaReservedBytes* {.serialize.}: NBytes
proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList =
RestContentList(

View File

@ -46,6 +46,7 @@ import ../stores
import ../market
import ../contracts/requests
import ../utils/json
import ../units
export requests
export logutils
@ -53,6 +54,7 @@ export logutils
logScope:
topics = "sales reservations"
type
AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte]
@ -71,7 +73,8 @@ type
size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
Reservations* = ref object
Reservations* = ref object of RootObj
availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability
repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
@ -95,12 +98,22 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
template withLock(lock, body) =
try:
await lock.acquire()
body
finally:
if lock.locked:
lock.release()
proc new*(T: type Reservations,
repo: RepoStore): Reservations =
T(repo: repo)
T(availabilityLock: newAsyncLock(),repo: repo)
proc init*(
_: type Availability,
@ -166,16 +179,16 @@ func key*(availability: Availability): ?!Key =
func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId)
func available*(self: Reservations): uint = self.repo.available
func available*(self: Reservations): uint = self.repo.available.uint
func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes)
self.repo.available(bytes.NBytes)
proc exists*(
self: Reservations,
key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key)
let exists = await self.repo.metaDs.ds.contains(key)
return exists
proc getImpl(
@ -186,7 +199,7 @@ proc getImpl(
let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err)
without serialized =? await self.repo.metaDs.get(key), error:
without serialized =? await self.repo.metaDs.ds.get(key), error:
return failure(error.toErr(GetFailedError))
return success serialized
@ -213,7 +226,7 @@ proc updateImpl(
without key =? obj.key, error:
return failure(error)
if err =? (await self.repo.metaDs.put(
if err =? (await self.repo.metaDs.ds.put(
key,
@(obj.toJson.toBytes)
)).errorOption:
@ -221,20 +234,19 @@ proc updateImpl(
return success()
proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
proc updateAvailability(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =
logScope:
availabilityId = obj.id
without key =? obj.key, error:
return failure(error)
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
@ -248,20 +260,20 @@ proc update*(
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
return res
else:
return failure(err)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
@ -280,11 +292,21 @@ proc update*(
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
return res
proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
proc delete(
self: Reservations,
key: Key): Future[?!void] {.async.} =
@ -294,7 +316,7 @@ proc delete(
if not await self.exists(key):
return success()
if err =? (await self.repo.metaDs.delete(key)).errorOption:
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
@ -312,31 +334,32 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error:
return failure(error)
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
withLock(self.availabilityLock):
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += reservation.size
availability.freeSize += reservation.size
if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
if err =? (await self.repo.metaDs.ds.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
return success()
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
@ -355,14 +378,14 @@ proc createAvailability*(
)
let bytes = availability.freeSize.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption:
# rollback the reserve
trace "rolling back reserve"
if rollbackErr =? (await self.repo.release(bytes)).errorOption:
if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
@ -370,54 +393,57 @@ proc createAvailability*(
return success(availability)
proc createReservation*(
method createReservation*(
self: Reservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Future[?!Reservation] {.async.} =
): Future[?!Reservation] {.async, base.} =
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without availabilityKey =? availabilityId.key, error:
return failure(error)
# Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation
availability.freeSize -= slotSize
# update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption:
# update availability with reduced size
trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
trace "rolling back reservation creation"
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return failure(updateErr)
return success(reservation)
trace "Reservation succesfully created"
return success(reservation)
proc returnBytesToAvailability*(
self: Reservations,
@ -429,48 +455,48 @@ proc returnBytesToAvailability*(
reservationId
availabilityId
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success()
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.update(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
proc release*(
self: Reservations,
reservationId: ReservationId,
@ -497,7 +523,7 @@ proc release*(
"trying to release an amount of bytes that is greater than the total size of the Reservation")
return failure(error)
if releaseErr =? (await self.repo.release(bytes)).errorOption:
if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError))
reservation.size -= bytes.u256
@ -507,7 +533,7 @@ proc release*(
# rollback release if an update error encountered
trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption:
if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
rollbackErr.parent = err
return failure(rollbackErr)
return failure(err)
@ -537,7 +563,7 @@ proc storables(
else:
raiseAssert "unknown type"
without results =? await self.repo.metaDs.query(query), error:
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)
# /sales/reservations
@ -621,6 +647,7 @@ proc findAvailability*(
minPrice >= availability.minPrice:
trace "availability matched",
id = availability.id,
size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
@ -635,8 +662,8 @@ proc findAvailability*(
return some availability
trace "availability did not match",
id = availability.id,
size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
collateral, availMaxCollateral = availability.maxCollateral

View File

@ -1,5 +1,6 @@
import pkg/questionable
import pkg/questionable/results
import pkg/metrics
import ../../logutils
import ../../market
@ -13,6 +14,8 @@ import ./ignored
import ./downloading
import ./errored
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
type
SalePreparing* = ref object of ErrorHandlingState
@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
request.id,
data.slotIndex
), error:
trace "Creation of reservation failed"
# Race condition:
# reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it.
# Should createReservation fail because there's no space, we proceed to SaleIgnored.
if error of BytesOutOfBoundsError:
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
codex_reservations_availability_mismatch.inc()
return some State(SaleIgnored())
return some State(SaleErrored(error: error))
trace "Reservation created succesfully"
data.reservation = some reservation
return some State(SaleDownloading())

View File

@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.}
trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now:
if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid)
else:
inc self.offset
@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
return
var numberReceived = 0
for maybeBeFuture in iter:
if be =? await maybeBeFuture:
inc numberReceived
await self.processBlockExpiration(be)
await sleepAsync(50.millis)
for beFut in iter:
let be = await beFut
inc numberReceived
await self.processBlockExpiration(be)
await sleepAsync(1.millis) # cooperative scheduling
# If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time.

View File

@ -6,7 +6,7 @@ import pkg/datastore/typedds
import ../utils/asynciter
type KeyVal[T] = tuple[key: Key, value: T]
type KeyVal*[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T](
queryIter: QueryIter[T],

View File

@ -1,679 +1,5 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import ./repostore/store
import ./repostore/types
import ./repostore/coders
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronos/futures
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/lrucache
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ../blocktype
import ../clock
import ../systemclock
import ../logutils
import ../merkletree
import ../utils
export blocktype, cid
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
type
QuotaUsedError* = object of CodexError
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
clock: Clock
totalBlocks*: uint # number of blocks in the store
quotaMaxBytes*: uint # maximum available bytes
quotaUsedBytes*: uint # bytes used by the repo
quotaReservedBytes*: uint # bytes reserved by the repo
blockTtl*: Duration
started*: bool
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64)
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): uint =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE
@nBytes & cidBytes & proofBytes
proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? CodexProof.decode(data[sizeof(uint64) + n..^1])
success((cid, proof))
proc decodeCid(_: type (Cid, CodexProof), data: seq[byte]): ?!Cid =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
success(cid)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blockCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Storing block cid and proof", blockCid, key
let value = (blockCid, proof).encode()
await self.metaDs.put(key, value)
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!(Cid, CodexProof)] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
error "Unable to decode cid and proof", err = err.msg
return failure(err)
return success (cid, proof)
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!Cid] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
# This failure is expected to happen frequently:
# NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine.
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Error getting cid from datastore", err = err.msg, key
return failure(err)
return (Cid, CodexProof).decodeCid(value)
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
error "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
error "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
let (cid, proof) = cidAndProof
without blk =? await self.getBlock(cid), err:
return failure(err)
success((blk, proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
return failure(err)
await self.getBlock(cid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: SecondsSince1970): ?!BatchEntry =
## Get an expiration entry for a batch with timestamp
##
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return success((key, ttl.toBytes))
proc getBlockExpirationEntry(
self: RepoStore,
cid: Cid,
ttl: ?Duration): ?!BatchEntry =
## Get an expiration entry for a batch for duration since "now"
##
let duration = ttl |? self.blockTtl
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
logScope:
cid = cid
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
without expiryKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without currentExpiry =? await self.metaDs.get(expiryKey), err:
if err of DatastoreKeyNotFound:
error "No current expiry exists for the block"
return failure(newException(BlockNotFoundError, err.msg))
else:
error "Could not read datastore key", err = err.msg
return failure(err)
logScope:
current = currentExpiry.toSecondsSince1970
ensuring = expiry
if expiry <= currentExpiry.toSecondsSince1970:
trace "Expiry is larger than or equal to requested"
return success()
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
trace "Error updating expiration metadata entry", err = err.msg
return failure(err)
return success()
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
await self.ensureExpiry(cidAndProof[0], expiry)
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
if err =? (await self.metaDs.put(
CodexTotalBlocksKey,
@(self.totalBlocks.uint64.toBytesBE))).errorOption:
trace "Error total blocks key!", err = err.msg
return failure(err)
return success()
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
warn "Error getting key from provider", err = err.msg
return failure(err)
if await key in self.repoDs:
trace "Block already in store", cid = blk.cid
return success()
if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes:
error "Cannot store block, quota used!", used = self.totalUsed
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))
var
batch: seq[BatchEntry]
let
used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
error "Error storing block", err = err.msg
return failure(err)
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
warn "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
error "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption:
error "Error deleting block after failed quota update", err = err.msg
return failure(err)
return failure(err)
self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
warn "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
self.updateMetrics()
return success()
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
logScope:
cid = cid
trace "Deleting block"
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
if blk =? (await self.getBlock(cid)):
if key =? makePrefixKey(self.postFixLen, cid) and
err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block!", err = err.msg
return failure(err)
if isErr (await self.updateQuotaBytesUsed(blk)):
trace "Unable to update quote-bytes-used in metadata store"
return failure("Unable to update quote-bytes-used in metadata store")
if isErr (await self.removeBlockExpirationEntry(blk.cid)):
trace "Unable to remove block expiration entry from metadata store"
return failure("Unable to remove block expiration entry from metadata store")
trace "Deleted block", cid, totalUsed = self.totalUsed
dec self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Fetching proof", key
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return success()
else:
return failure(err)
without cid =? (Cid, CodexProof).decodeCid(value), err:
return failure(err)
trace "Deleting block", cid
if err =? (await self.delBlock(cid)).errorOption:
return failure(err)
await self.metaDs.delete(key)
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without cid =? await self.getCid(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(cid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} =
## Get block expirations from the given RepoStore
##
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)
without queryIter =? (await self.metaDs.query(query)), err:
trace "Unable to execute block expirations query"
return failure(err)
var iter = AsyncIter[?BlockExpiration]()
proc next(): Future[?BlockExpiration] {.async.} =
if not queryIter.finished:
if pair =? (await queryIter.next()) and blockKey =? pair.key:
let expirationTimestamp = pair.data
let cidResult = Cid.init(blockKey.value)
if not cidResult.isOk:
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration(
cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970
).some
else:
discard await queryIter.dispose()
iter.finish
return BlockExpiration.none
iter.next = next
return success iter
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes
if (self.totalUsed + bytes) > self.quotaMaxBytes:
trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes
return failure(
newException(QuotaNotEnoughError, "Not enough storage quota to reserver"))
self.quotaReservedBytes += bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error reserving bytes", err = err.msg
self.quotaReservedBytes += bytes
return failure(err)
return success()
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes
if (self.quotaReservedBytes.int - bytes.int) < 0:
trace "Cannot release this many bytes",
quotaReservedBytes = self.quotaReservedBytes, bytes
return failure("Cannot release this many bytes")
self.quotaReservedBytes -= bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error releasing bytes", err = err.msg
self.quotaReservedBytes -= bytes
return failure(err)
trace "Released bytes", bytes
self.updateMetrics()
return success()
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting repo"
without total =? await self.metaDs.get(CodexTotalBlocksKey), err:
if not (err of DatastoreKeyNotFound):
error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey
if total.len > 0:
self.totalBlocks = uint64.fromBytesBE(total).uint
trace "Number of blocks in store at start", total = self.totalBlocks
## load current persist and cache bytes from meta ds
without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting cache bytes from datastore",
err = err.msg, key = $QuotaUsedKey
raise newException(Defect, err.msg)
if quotaUsedBytes.len > 0:
self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint
notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes
without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting persist bytes from datastore",
err = err.msg, key = $QuotaReservedKey
raise newException(Defect, err.msg)
if quotaReservedBytes.len > 0:
self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint
if self.quotaUsedBytes > self.quotaMaxBytes:
raiseAssert "All storage quota used, increase storage quota!"
notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes
self.updateMetrics()
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: metaDs,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)
export store, types, coders

View File

@ -0,0 +1,47 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
##
import std/sugar
import pkg/libp2p/cid
import pkg/serde/json
import pkg/stew/byteutils
import pkg/stew/endians2
import ./types
import ../../errors
import ../../merkletree
import ../../utils/json
proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes()
proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes()
proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes()
proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc encode*(i: uint64): seq[byte] =
@(i.toBytesBE)
proc decode*(T: type uint64, bytes: seq[byte]): ?!T =
if bytes.len >= sizeof(uint64):
success(uint64.fromBytesBE(bytes))
else:
failure("Not enough bytes to decode `uint64`")
proc encode*(i: Natural | enum): seq[byte] = cast[uint64](i).encode
proc decode*(T: typedesc[Natural | enum], bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[T](ui))

View File

@ -0,0 +1,213 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ../blockstore
import ../keyutils
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
logScope:
topics = "codex repostore"
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
await self.metaDs.modifyGet(key,
proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} =
var
md: LeafMetadata
res: StoreResultKind
if currMd =? maybeCurrMd:
md = currMd
res = AlreadyInStore
else:
md = LeafMetadata(blkCid: blkCid, proof: proof)
res = Stored
(md.some, res)
)
proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
without leafMd =? await get[LeafMetadata](self.metaDs, key), err:
if err of DatastoreKeyNotFound:
return failure(newException(BlockNotFoundError, err.msg))
else:
return failure(err)
success(leafMd)
proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} =
await self.metaDs.modify(CodexTotalBlocksKey,
proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
let count: Natural =
if currCount =? maybeCurrCount:
currCount + plusCount - minusCount
else:
plusCount - minusCount
self.totalBlocks = count
codex_repostore_blocks.set(count.int64)
count.some
)
proc updateQuotaUsage*(
self: RepoStore,
plusUsed: NBytes = 0.NBytes,
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes
): Future[?!void] {.async.} =
await self.metaDs.modify(QuotaUsedKey,
proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
var usage: QuotaUsage
if currUsage =? maybeCurrUsage:
usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved)
else:
usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
if usage.used + usage.reserved > self.quotaMaxBytes:
raise newException(QuotaNotEnoughError,
"Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
$usage.reserved & ", limit: " & $self.quotaMaxBytes)
else:
self.quotaUsage = usage
codex_repostore_bytes_used.set(usage.used.int64)
codex_repostore_bytes_reserved.set(usage.reserved.int64)
return usage.some
)
proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,
plusRefCount: Natural = 0,
minusRefCount: Natural = 0,
minExpiry: SecondsSince1970 = 0
): Future[?!void] {.async.} =
if cid.isEmpty:
return success()
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
await self.metaDs.modify(metaKey,
proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} =
if currBlockMd =? maybeCurrBlockMd:
BlockMetadata(
size: currBlockMd.size,
expiry: max(currBlockMd.expiry, minExpiry),
refCount: currBlockMd.refCount + plusRefCount - minusRefCount
).some
else:
raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found")
)
proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
without metaKey =? createBlockExpirationMetadataKey(blk.cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} =
var
md: BlockMetadata
res: StoreResult
if currMd =? maybeCurrMd:
if currMd.size == blk.data.len.NBytes:
md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount)
res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if not hasBlock:
warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
else:
raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid)
else:
md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0)
res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
(md.some, res)
)
proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} =
if cid.isEmpty:
return success(DeleteResult(kind: InUse))
without metaKey =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, cid), err:
return failure(err)
await self.metaDs.modifyGet(metaKey,
proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} =
var
maybeMeta: ?BlockMetadata
res: DeleteResult
if currMd =? maybeCurrMd:
if currMd.refCount == 0 or currMd.expiry < expiryLimit:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: Deleted, released: currMd.size)
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
else:
maybeMeta = currMd.some
res = DeleteResult(kind: InUse)
else:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: NotFound)
# making sure that the block acutally is removed from the repoDs
without hasBlock =? await self.repoDs.has(blkKey), err:
raise err
if hasBlock:
warn "Block metadata is absent, but block is present. Removing block.", cid
if err =? (await self.repoDs.delete(blkKey)).errorOption:
raise err
(maybeMeta, res)
)

View File

@ -0,0 +1,395 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronos/futures
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/[cid, multicodec]
import pkg/questionable
import pkg/questionable/results
import ./coders
import ./types
import ./operations
import ../blockstore
import ../keyutils
import ../queryiterhelper
import ../../blocktype
import ../../clock
import ../../logutils
import ../../merkletree
import ../../utils
export blocktype, cid
logScope:
topics = "codex repostore"
###########################################################
# BlockStore API
###########################################################
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
without blk =? await self.getBlock(leafMd.blkCid), err:
return failure(err)
success((blk, leafMd.proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.getBlock(leafMd.blkCid)
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
await self.updateBlockMetadata(cid, minExpiry = expiry)
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural,
blkCid: Cid,
proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
treeCid = treeCid
index = index
blkCid = blkCid
trace "Storing LeafMetadata"
without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err:
return failure(err)
if blkCid.mcodec == BlockCodec:
if res == Stored:
if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption:
return failure(err)
trace "Leaf metadata stored, block refCount incremented"
else:
trace "Leaf metadata already exists"
return success()
method getCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!(Cid, CodexProof)] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success((leafMd.blkCid, leafMd.proof))
method getCid*(
self: RepoStore,
treeCid: Cid,
index: Natural
): Future[?!Cid] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
success(leafMd.blkCid)
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
without res =? await self.storeBlock(blk, expiry), err:
return failure(err)
if res.kind == Stored:
trace "Block Stored"
if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# rollback changes
without delRes =? await self.tryDeleteBlock(blk.cid), err:
return failure(err)
return failure(err)
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err)
else:
trace "Block already exists"
return success()
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore when block refCount is 0 or block is expired
##
logScope:
cid = cid
trace "Attempting to delete a block"
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err:
return failure(err)
if res.kind == Deleted:
trace "Block deleted"
if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption:
return failure(err)
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
return failure(err)
elif res.kind == InUse:
trace "Block in use, refCount > 0 and not expired"
else:
trace "Block not found in store"
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success()
else:
return failure(err)
if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption:
if not (err of BlockNotFoundError):
return failure(err)
await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(leafMd.blkCid)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!AsyncIter[?Cid]] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
let query = Query.init(key, value=false)
without queryIter =? (await self.repoDs.query(query)), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
if queryIter.finished:
iter.finish
else:
if pair =? (await queryIter.next()) and cid =? pair.key:
doAssert pair.data.len == 0
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
else:
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
## Get iterator with block expirations
##
without beQuery =? createBlockExpirationQuery(maxNumber, offset), err:
error "Unable to format block expirations query", err = err.msg
return failure(err)
without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter,
proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
trace "Closing repostore"
if not self.metaDs.isNil:
(await self.metaDs.close()).expect("Should meta datastore")
if not self.repoDs.isNil:
(await self.repoDs.close()).expect("Should repo datastore")
###########################################################
# RepoStore procs
###########################################################
proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", bytes
await self.updateQuotaUsage(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", bytes
await self.updateQuotaUsage(minusReserved = bytes)
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting rep"
if err =? (await self.updateTotalBlocksCount()).errorOption:
raise newException(CodexError, err.msg)
if err =? (await self.updateQuotaUsage()).errorOption:
raise newException(CodexError, err.msg)
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
await self.close()
self.started = false

View File

@ -0,0 +1,107 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/datastore
import pkg/datastore/typedds
import pkg/libp2p/cid
import ../blockstore
import ../../clock
import ../../errors
import ../../merkletree
import ../../systemclock
import ../../units
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 8.GiBs
type
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage
totalBlocks*: Natural
blockTtl*: Duration
started*: bool
QuotaUsage* {.serialize.} = object
used*: NBytes
reserved*: NBytes
BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
size*: NBytes
refCount*: Natural
LeafMetadata* {.serialize.} = object
blkCid*: Cid
proof*: CodexProof
BlockExpiration* {.serialize.} = object
cid*: Cid
expiry*: SecondsSince1970
DeleteResultKind* {.serialize.} = enum
Deleted = 0, # block removed from store
InUse = 1, # block not removed, refCount > 0 and not expired
NotFound = 2 # block not found in store
DeleteResult* {.serialize.} = object
kind*: DeleteResultKind
released*: NBytes
StoreResultKind* {.serialize.} = enum
Stored = 0, # new block stored
AlreadyInStore = 1 # block already in store
StoreResult* {.serialize.} = object
kind*: StoreResultKind
used*: NBytes
func quotaUsedBytes*(self: RepoStore): NBytes =
self.quotaUsage.used
func quotaReservedBytes*(self: RepoStore): NBytes =
self.quotaUsage.reserved
func totalUsed*(self: RepoStore): NBytes =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): NBytes =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: NBytes): bool =
return bytes < self.available()
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: TypedDatastore.init(metaDs),
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)

View File

@ -38,7 +38,6 @@ type
StoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
method initStream*(s: StoreStream) =
if s.objName.len == 0:
@ -57,13 +56,15 @@ proc new*(
result = StoreStream(
store: store,
manifest: manifest,
pad: pad,
offset: 0)
result.initStream()
method `size`*(self: StoreStream): int =
bytes(self.manifest, self.pad).int
## The size of a StoreStream is the size of the original dataset, without
## padding or parity blocks.
let m = self.manifest
(if m.protected: m.originalDatasetSize else: m.datasetSize).int
proc `size=`*(self: StoreStream, size: int)
{.error: "Setting the size is forbidden".} =

View File

@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes
logutils.formatIt(NBytes): $it
const
MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz
KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz
MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz
GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz
proc KiBs*(v: Natural): NBytes = v.NBytes * KiB
proc MiBs*(v: Natural): NBytes = v.NBytes * MiB
proc GiBs*(v: Natural): NBytes = v.NBytes * GiB
func divUp*[T: NBytes](a, b : T): int =
## Division with result rounded up (rather than truncated as in 'div')

View File

@ -1,481 +0,0 @@
{
"version": 1,
"packages": {
"stew": {
"version": "0.1.0",
"vcsRevision": "6ad35b876fb6ebe0dfee0f697af173acc47906ee",
"url": "https://github.com/status-im/nim-stew.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "46d58c4feb457f3241e3347778334e325dce5268"
}
},
"unittest2": {
"version": "0.0.4",
"vcsRevision": "f180f596c88dfd266f746ed6f8dbebce39c824db",
"url": "https://github.com/status-im/nim-unittest2.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "fa309c41eaf6ef57895b9e603f2620a2f6e11780"
}
},
"httputils": {
"version": "0.3.0",
"vcsRevision": "689da19e9e9cfff4ced85e2b25c6b2b5598ed079",
"url": "https://github.com/status-im/nim-http-utils.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "4ad3ad68d13c50184180ab4b2eacc0bd7ed2ed44"
}
},
"nimcrypto": {
"version": "0.5.4",
"vcsRevision": "a5742a9a214ac33f91615f3862c7b099aec43b00",
"url": "https://github.com/cheatfate/nimcrypto.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "f76c87707cd4e96355b8bb6ef27e7f8b0aac1e08"
}
},
"questionable": {
"version": "0.10.2",
"vcsRevision": "6018fd43e033d5a5310faa45bcaa1b44049469a4",
"url": "https://github.com/status-im/questionable.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "36a6c012637c7736a390e74a7f94667bca562073"
}
},
"upraises": {
"version": "0.1.0",
"vcsRevision": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2",
"url": "https://github.com/markspanbroek/upraises.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a0243c8039e12d547dbb2e9c73789c16bb8bc956"
}
},
"secp256k1": {
"version": "0.5.2",
"vcsRevision": "5340cf188168d6afcafc8023770d880f067c0b2f",
"url": "https://github.com/status-im/nim-secp256k1.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"nimcrypto"
],
"checksums": {
"sha1": "ae9cbea4487be94a06653ffee075a7f1bd1e231e"
}
},
"stint": {
"version": "0.0.1",
"vcsRevision": "036c71d06a6b22f8f967ba9d54afd2189c3872ca",
"url": "https://github.com/status-im/stint.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "0f187a2115315ca898e5f9a30c5e506cf6057062"
}
},
"contractabi": {
"version": "0.4.4",
"vcsRevision": "b111c27b619fc1d81fb1c6942372824a18a71960",
"url": "https://github.com/status-im/nim-contract-abi",
"downloadMethod": "git",
"dependencies": [
"stint",
"stew",
"nimcrypto",
"questionable",
"upraises"
],
"checksums": {
"sha1": "3ed10b11eec8fe14a81e4e58dbc41f88fd6ddf7a"
}
},
"nitro": {
"version": "0.5.1",
"vcsRevision": "6b4c455bf4dad7449c1580055733a1738fcd5aec",
"url": "https://github.com/status-im/nim-nitro.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"questionable",
"upraises",
"contractabi",
"secp256k1",
"stint",
"stew"
],
"checksums": {
"sha1": "19d90deaeb84b19214dc2aab28a466f0bc4a7e2e"
}
},
"bearssl": {
"version": "0.1.5",
"vcsRevision": "32e125015ae4251675763842366380795a91b722",
"url": "https://github.com/status-im/nim-bearssl.git",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "c58a61e71c49ed7c7fe7608df40d60945f7c4bad"
}
},
"chronos": {
"version": "3.0.11",
"vcsRevision": "17fed89c99beac5a92d3668d0d3e9b0e4ac13936",
"url": "https://github.com/status-im/nim-chronos.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"bearssl",
"httputils",
"unittest2"
],
"checksums": {
"sha1": "f6fffc87571e5f76af2a77c4ebcc0e00909ced4e"
}
},
"testutils": {
"version": "0.4.2",
"vcsRevision": "aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2",
"url": "https://github.com/status-im/nim-testutils",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "94427e0cce0e0c5841edcd3a6530b4e6b857a3cb"
}
},
"faststreams": {
"version": "0.3.0",
"vcsRevision": "1b561a9e71b6bdad1c1cdff753418906037e9d09",
"url": "https://github.com/status-im/nim-faststreams.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"testutils",
"chronos",
"unittest2"
],
"checksums": {
"sha1": "97edf9797924af48566a0af8267203dc21d80c77"
}
},
"serialization": {
"version": "0.1.0",
"vcsRevision": "fcd0eadadde0ee000a63df8ab21dc4e9f015a790",
"url": "https://github.com/status-im/nim-serialization.git",
"downloadMethod": "git",
"dependencies": [
"faststreams",
"unittest2",
"stew"
],
"checksums": {
"sha1": "fef59519892cac70cccd81b612085caaa5e3e6cf"
}
},
"json_serialization": {
"version": "0.1.0",
"vcsRevision": "c5f0e2465e8375dfc7aa0f56ccef67cb680bc6b0",
"url": "https://github.com/status-im/nim-json-serialization.git",
"downloadMethod": "git",
"dependencies": [
"serialization",
"stew"
],
"checksums": {
"sha1": "d89d79d0679a3a41b350e3ad4be56c0308cc5ec6"
}
},
"chronicles": {
"version": "0.10.2",
"vcsRevision": "1682096306ddba8185dcfac360a8c3f952d721e4",
"url": "https://github.com/status-im/nim-chronicles.git",
"downloadMethod": "git",
"dependencies": [
"testutils",
"json_serialization"
],
"checksums": {
"sha1": "9a5bebb76b0f7d587a31e621d260119279e91c76"
}
},
"presto": {
"version": "0.0.4",
"vcsRevision": "962bb588d19c7180e39f0d9f18131e75861bab20",
"url": "https://github.com/status-im/nim-presto.git",
"downloadMethod": "git",
"dependencies": [
"chronos",
"chronicles",
"stew"
],
"checksums": {
"sha1": "8d3e77d7ddf14606504fe86c430b1b5712aada92"
}
},
"zlib": {
"version": "0.1.0",
"vcsRevision": "74cdeb54b21bededb5a515d36f608bc1850555a2",
"url": "https://github.com/status-im/nim-zlib",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "01d330dc4c1924e56b1559ee73bc760e526f635c"
}
},
"libbacktrace": {
"version": "0.0.8",
"vcsRevision": "ce966b1c469dda179b54346feaaf1a62202c984f",
"url": "https://github.com/status-im/nim-libbacktrace",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "ba7a2f3d21db894ace7bb4ebe0a5b06af995d68b"
}
},
"dnsclient": {
"version": "0.1.2",
"vcsRevision": "fbb76f8af8a33ab818184a7d4406d9fee20993be",
"url": "https://github.com/ba0f3/dnsclient.nim.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "663239a914c814204b30dda6e0902cc0fbd0b8ee"
}
},
"metrics": {
"version": "0.0.1",
"vcsRevision": "743f81d4f6c6ebf0ac02389f2392ff8b4235bee5",
"url": "https://github.com/status-im/nim-metrics.git",
"downloadMethod": "git",
"dependencies": [
"chronos"
],
"checksums": {
"sha1": "6274c7ae424b871bc21ca3a6b6713971ff6a8095"
}
},
"asynctest": {
"version": "0.3.1",
"vcsRevision": "5347c59b4b057443a014722aa40800cd8bb95c69",
"url": "https://github.com/status-im/asynctest.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "53e0b610d13700296755a4ebe789882cae47a3b9"
}
},
"websock": {
"version": "0.1.0",
"vcsRevision": "8a433c6ba43940b13ce56f83d79a93273ece5684",
"url": "https://github.com/status-im/nim-websock.git",
"downloadMethod": "git",
"dependencies": [
"chronos",
"httputils",
"chronicles",
"stew",
"nimcrypto",
"bearssl",
"zlib"
],
"checksums": {
"sha1": "1dbb4e1dd8c525c5674dca42b8eb25bdeb2f76b3"
}
},
"libp2p": {
"version": "0.0.2",
"vcsRevision": "eeb3c210a37408716b6a8b45f578adf87610cef2",
"url": "https://github.com/status-im/nim-libp2p.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"dnsclient",
"bearssl",
"chronicles",
"chronos",
"metrics",
"secp256k1",
"stew",
"websock"
],
"checksums": {
"sha1": "e9e9b93e6e425e47df1eea01a8e9efeac6e0fc97"
}
},
"combparser": {
"version": "0.2.0",
"vcsRevision": "ba4464c005d7617c008e2ed2ebc1ba52feb469c6",
"url": "https://github.com/PMunch/combparser.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a3635260961a893b88f69aac19f1b24e032a7e97"
}
},
"protobuf_serialization": {
"version": "0.2.0",
"vcsRevision": "f7d671f877e01213494aac7903421ccdbe70616f",
"url": "https://github.com/status-im/nim-protobuf-serialization.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"faststreams",
"serialization",
"combparser"
],
"checksums": {
"sha1": "9418459027d0d5eb30a974649dc615a76e8e4aca"
}
},
"confutils": {
"version": "0.1.0",
"vcsRevision": "0435e67832b6bb8dfdf0ddb102903e9d820206d2",
"url": "https://github.com/status-im/nim-confutils.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "1edab14b434aca6ae28e2385982fa60d623c600a"
}
},
"news": {
"version": "0.5",
"vcsRevision": "e79420e835489132aaa412f993b565f5dd6295f4",
"url": "https://github.com/status-im/news",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a5f1789bf650822156712fd3bdec1bf6ab4ac42e"
}
},
"json_rpc": {
"version": "0.0.2",
"vcsRevision": "5a281760803907f4989cacf109b516381dfbbe11",
"url": "https://github.com/status-im/nim-json-rpc",
"downloadMethod": "git",
"dependencies": [
"stew",
"nimcrypto",
"stint",
"chronos",
"httputils",
"chronicles",
"news",
"websock",
"json_serialization"
],
"checksums": {
"sha1": "3ec28a4c9e5dcd3210e85dfcfdd0a6baf46eccbe"
}
},
"ethers": {
"version": "0.1.7",
"vcsRevision": "270d358b869d02a4c625dde971f799db336670fb",
"url": "https://github.com/status-im/nim-ethers",
"downloadMethod": "git",
"dependencies": [
"chronos",
"contractabi",
"questionable",
"upraises",
"json_rpc",
"stint",
"stew"
],
"checksums": {
"sha1": "3eb78a87744d5894595f33a36b21348a59d8f1a5"
}
},
"libp2pdht": {
"version": "0.0.1",
"vcsRevision": "9a872518d621bf8b390f88cd65617bca6aca1d2d",
"url": "https://github.com/status-im/nim-libp2p-dht.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"bearssl",
"chronicles",
"chronos",
"libp2p",
"metrics",
"protobuf_serialization",
"secp256k1",
"stew",
"stint",
"asynctest"
],
"checksums": {
"sha1": "d97e8b751e11ccc7e059b79fb1a046d2b0d0e872"
}
},
"lrucache": {
"version": "1.2.1",
"vcsRevision": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd",
"url": "https://github.com/status-im/lrucache.nim",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "2c4365d10029d6f6a8b92a712e9002ac3886b07d"
}
},
"leopard": {
"version": "0.0.1",
"vcsRevision": "2a6a63923e9b95676b5ae7ff2c346be0e63e753c",
"url": "https://github.com/status-im/nim-leopard",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2",
"upraises"
],
"checksums": {
"sha1": "e71db348018eab26f3059e1c03bf3088c5109cfe"
}
},
"taskpools": {
"version": "0.0.3",
"vcsRevision": "8d408ac6cfc9c24ec8b7b65d5993e85050dcbaa9",
"url": "https://github.com/status-im/nim-taskpools.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "37bbbbb03d9b893af6980592624211ab057392c0"
}
},
"blscurve": {
"version": "0.0.1",
"vcsRevision": "0237e4e0e914fc19359c18a66406d33bc942775c",
"url": "https://github.com/status-im/nim-blscurve",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"stew",
"taskpools"
],
"checksums": {
"sha1": "65f58854ffd2098e0d0ca08f6ea0efb3c27529e0"
}
}
}
}

View File

@ -8,6 +8,7 @@
## those terms.
import std/sequtils
import std/sugar
import pkg/chronos
import pkg/libp2p
import pkg/questionable
@ -24,33 +25,28 @@ type
testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool
iteratorIndex: int
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =
self.delBlockCids.add(cid)
self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid)
dec self.iteratorIndex
return success()
method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} =
method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} =
if self.getBlockExpirationsThrows:
raise new CatchableError
self.getBeMaxNumber = maxNumber
self.getBeOffset = offset
var iter = AsyncIter[?BlockExpiration]()
let
testBlockExpirationsCpy = @(self.testBlockExpirations)
limit = min(offset + maxNumber, len(testBlockExpirationsCpy))
self.iteratorIndex = offset
var numberLeft = maxNumber
proc next(): Future[?BlockExpiration] {.async.} =
if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations):
dec numberLeft
let selectedBlock = self.testBlockExpirations[self.iteratorIndex]
inc self.iteratorIndex
return selectedBlock.some
iter.finish
return BlockExpiration.none
let
iter1 = AsyncIter[int].new(offset..<limit)
iter2 = map[int, BlockExpiration](iter1,
proc (i: int): Future[BlockExpiration] {.async.} =
testBlockExpirationsCpy[i]
)
iter.next = next
return success iter
success(iter2)

View File

@ -0,0 +1,33 @@
import pkg/chronos
import pkg/codex/sales
import pkg/codex/stores
import pkg/questionable/results
type
MockReservations* = ref object of Reservations
createReservationThrowBytesOutOfBoundsError: bool
proc new*(
T: type MockReservations,
repo: RepoStore
): MockReservations =
## Create a mock clock instance
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) =
self.createReservationThrowBytesOutOfBoundsError = flag
method createReservation*(
self: MockReservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256): Future[?!Reservation] {.async.} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)

View File

@ -5,7 +5,6 @@ import std/cpuinfo
import pkg/libp2p
import pkg/chronos
import pkg/taskpools
import pkg/codex/codextypes
import pkg/codex/chunker
import pkg/codex/stores

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos
import pkg/stew/byteutils
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import pkg/stint
@ -32,6 +33,7 @@ import pkg/codex/discovery
import pkg/codex/erasure
import pkg/codex/merkletree
import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore/coders
import pkg/codex/utils/asynciter
import pkg/codex/indexingstrategy
@ -110,10 +112,11 @@ asyncchecksuite "Test Node - Host contracts":
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
check blkMd.expiry == expectedExpiry
test "onStore callback is set":
check sales.onStore.isSome
@ -131,7 +134,7 @@ asyncchecksuite "Test Node - Host contracts":
return success()
(await onStore(request, 1.u256, onBlocks)).tryGet()
check fetchedBytes == 262144
check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots)
@ -139,7 +142,8 @@ asyncchecksuite "Test Node - Host contracts":
for index in indexer.getIndicies(1):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
expiry = await localStoreMetaDs.get(expiryKey)
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970
check blkMd.expiry == request.expiry.toSecondsSince1970

View File

@ -9,6 +9,7 @@ import std/cpuinfo
import pkg/chronos
import pkg/stew/byteutils
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import pkg/stint

View File

@ -1,20 +1,66 @@
import std/unittest
import pkg/chronos
import pkg/questionable
import pkg/datastore
import pkg/stew/byteutils
import pkg/codex/contracts/requests
import pkg/codex/sales/states/preparing
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled
import pkg/codex/sales/states/ignored
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import ../../../asynctest
import ../../helpers
import ../../examples
import ../../helpers/mockmarket
import ../../helpers/mockreservations
import ../../helpers/mockclock
suite "sales state 'preparing'":
asyncchecksuite "sales state 'preparing'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var agent: SalesAgent
var state: SalePreparing
var repo: RepoStore
var availability: Availability
var context: SalesContext
var reservations: MockReservations
setup:
availability = Availability(
totalSize: request.ask.slotSize + 100.u256,
freeSize: request.ask.slotSize + 100.u256,
duration: request.ask.duration + 60.u256,
minPrice: request.ask.pricePerSlot - 10.u256,
maxCollateral: request.ask.collateral + 400.u256
)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
state = SalePreparing.new()
context = SalesContext(
market: market,
clock: clock
)
reservations = MockReservations.new(repo)
context.reservations = reservations
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
teardown:
await repo.stop()
test "switches to cancelled state when request expires":
let next = state.onCancelled(request)
@ -27,3 +73,28 @@ suite "sales state 'preparing'":
test "switches to filled state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
proc createAvailability() {.async.} =
let a = await reservations.createAvailability(
availability.totalSize,
availability.duration,
availability.minPrice,
availability.maxCollateral
)
availability = a.get
test "run switches to ignored when no availability":
let next = await state.run(agent)
check !next of SaleIgnored
test "run switches to downloading when reserved":
await createAvailability()
let next = await state.run(agent)
check !next of SaleDownloading
test "run switches to ignored when reserve fails with BytesOutOfBounds":
await createAvailability()
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
let next = await state.run(agent)
check !next of SaleIgnored

View File

@ -1,4 +1,5 @@
import std/random
import std/sequtils
import pkg/questionable
import pkg/questionable/results
@ -6,6 +7,7 @@ import pkg/chronos
import pkg/datastore
import pkg/codex/stores
import pkg/codex/errors
import pkg/codex/sales
import pkg/codex/utils/json
@ -13,6 +15,8 @@ import ../../asynctest
import ../examples
import ../helpers
const CONCURRENCY_TESTS_COUNT = 1000
asyncchecksuite "Reservations module":
var
repo: RepoStore
@ -73,9 +77,9 @@ asyncchecksuite "Reservations module":
check availability.id != AvailabilityId.default
test "creating availability reserves bytes in repo":
let orig = repo.available
let orig = repo.available.uint
let availability = createAvailability()
check repo.available == (orig.u256 - availability.freeSize).truncate(uint)
check repo.available.uint == (orig.u256 - availability.freeSize).truncate(uint)
test "can get all availabilities":
let availability1 = createAvailability()
@ -148,6 +152,39 @@ asyncchecksuite "Reservations module":
check created.isErr
check created.error of BytesOutOfBoundsError
test "cannot create reservation larger than availability size - concurrency test":
proc concurrencyTest(): Future[void] {.async.} =
let availability = createAvailability()
let one = reservations.createReservation(
availability.id,
availability.totalSize - 1,
RequestId.example,
UInt256.example
)
let two = reservations.createReservation(
availability.id,
availability.totalSize,
RequestId.example,
UInt256.example
)
let oneResult = await one
let twoResult = await two
check oneResult.isErr or twoResult.isErr
if oneResult.isErr:
check oneResult.error of BytesOutOfBoundsError
if twoResult.isErr:
check twoResult.error of BytesOutOfBoundsError
var futures: seq[Future[void]]
for _ in 1..CONCURRENCY_TESTS_COUNT:
futures.add(concurrencyTest())
await allFuturesThrowing(futures)
test "creating reservation reduces availability size":
let availability = createAvailability()
let orig = availability.freeSize
@ -211,7 +248,7 @@ asyncchecksuite "Reservations module":
check updated.freeSize > orig
check (updated.freeSize - orig) == 200.u256
check (repo.quotaReservedBytes - origQuota) == 200
check (repo.quotaReservedBytes - origQuota) == 200.NBytes
test "update releases quota when lowering size":
let
@ -220,7 +257,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize - 100
check isOk await reservations.update(availability)
check (origQuota - repo.quotaReservedBytes) == 100
check (origQuota - repo.quotaReservedBytes) == 100.NBytes
test "update reserves quota when growing size":
let
@ -229,7 +266,7 @@ asyncchecksuite "Reservations module":
availability.totalSize = availability.totalSize + 100
check isOk await reservations.update(availability)
check (repo.quotaReservedBytes - origQuota) == 100
check (repo.quotaReservedBytes - origQuota) == 100.NBytes
test "reservation can be partially released":
let availability = createAvailability()
@ -333,17 +370,17 @@ asyncchecksuite "Reservations module":
check got.error of NotExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
check reservations.available == DefaultQuotaBytes.uint
test "reports quota available to be reserved":
check reservations.hasAvailable(DefaultQuotaBytes - 1)
check reservations.hasAvailable(DefaultQuotaBytes.uint - 1)
test "reports quota not available to be reserved":
check not reservations.hasAvailable(DefaultQuotaBytes + 1)
check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1)
test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability(
(DefaultQuotaBytes + 1).u256,
(DefaultQuotaBytes.uint + 1).u256,
UInt256.example,
UInt256.example,
UInt256.example

View File

@ -2,7 +2,7 @@ import std/sequtils
import std/sugar
import std/times
import pkg/chronos
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales

View File

@ -6,5 +6,9 @@ import ./states/testinitialproving
import ./states/testfilled
import ./states/testproving
import ./states/testsimulatedproving
import ./states/testcancelled
import ./states/testerrored
import ./states/testignored
import ./states/testpreparing
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,71 @@
import std/unittest
import std/random
import pkg/stew/objects
import pkg/questionable
import pkg/questionable/results
import pkg/codex/clock
import pkg/codex/stores/repostore/types
import pkg/codex/stores/repostore/coders
import ../../helpers
checksuite "Test coders":
proc rand(T: type NBytes): T =
rand(Natural).NBytes
proc rand(E: type[enum]): E =
let ordinals = enumRangeInt64(E)
E(ordinals[rand(ordinals.len - 1)])
proc rand(T: type QuotaUsage): T =
QuotaUsage(
used: rand(NBytes),
reserved: rand(NBytes)
)
proc rand(T: type BlockMetadata): T =
BlockMetadata(
expiry: rand(SecondsSince1970),
size: rand(NBytes),
refCount: rand(Natural)
)
proc rand(T: type DeleteResult): T =
DeleteResult(
kind: rand(DeleteResultKind),
released: rand(NBytes)
)
proc rand(T: type StoreResult): T =
StoreResult(
kind: rand(StoreResultKind),
used: rand(NBytes)
)
test "Natural encode/decode":
for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]:
check:
success(val) == Natural.decode(encode(val))
test "QuotaUsage encode/decode":
for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)):
check:
success(val) == QuotaUsage.decode(encode(val))
test "BlockMetadata encode/decode":
for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)):
check:
success(val) == BlockMetadata.decode(encode(val))
test "DeleteResult encode/decode":
for val in newSeqWith[DeleteResult](100, rand(DeleteResult)):
check:
success(val) == DeleteResult.decode(encode(val))
test "StoreResult encode/decode":
for val in newSeqWith[StoreResult](100, rand(StoreResult)):
check:
success(val) == StoreResult.decode(encode(val))

View File

@ -34,10 +34,10 @@ checksuite "BlockMaintainer":
var testBe2: BlockExpiration
var testBe3: BlockExpiration
proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration =
proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration =
BlockExpiration(
cid: bt.Block.example.cid,
expiration: expiration
expiry: expiry
)
setup:
@ -186,4 +186,3 @@ checksuite "BlockMaintainer":
await invokeTimerManyTimes()
# Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]

View File

@ -23,6 +23,8 @@ import ../helpers/mockclock
import ../examples
import ./commonstoretests
import ./repostore/testcoders
checksuite "Test RepoStore start/stop":
var
@ -34,24 +36,24 @@ checksuite "Test RepoStore start/stop":
metaDs = SQLiteDatastore.new(Memory).tryGet()
test "Should set started flag once started":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start()
check repo.started
test "Should set started flag to false once stopped":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start()
await repo.stop()
check not repo.started
test "Should allow start to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.start()
await repo.start()
check repo.started
test "Should allow stop to be called multiple times":
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
await repo.stop()
await repo.stop()
check not repo.started
@ -73,7 +75,7 @@ asyncchecksuite "RepoStore":
mockClock = MockClock.new()
mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb)
teardown:
(await repoDs.close()).tryGet
@ -85,117 +87,107 @@ asyncchecksuite "RepoStore":
test "Should update current used bytes on block put":
let blk = createTestBlock(200)
check repo.quotaUsedBytes == 0
check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet
check:
repo.quotaUsedBytes == 200
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
repo.quotaUsedBytes == 200'nb
test "Should update current used bytes on block delete":
let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0
check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100
check repo.quotaUsedBytes == 100'nb
(await repo.delBlock(blk.cid)).tryGet
check:
repo.quotaUsedBytes == 0
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
repo.quotaUsedBytes == 0'nb
test "Should not update current used bytes if block exist":
let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0
check repo.quotaUsedBytes == 0'nb
(await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100
check repo.quotaUsedBytes == 100'nb
# put again
(await repo.putBlock(blk)).tryGet
check repo.quotaUsedBytes == 100
check:
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
check repo.quotaUsedBytes == 100'nb
test "Should fail storing passed the quota":
let blk = createTestBlock(300)
check repo.totalUsed == 0
expect QuotaUsedError:
check repo.totalUsed == 0'nb
expect QuotaNotEnoughError:
(await repo.putBlock(blk)).tryGet
test "Should reserve bytes":
let blk = createTestBlock(100)
check repo.totalUsed == 0
check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100
check repo.totalUsed == 100'nb
(await repo.reserve(100)).tryGet
(await repo.reserve(100'nb)).tryGet
check:
repo.totalUsed == 200
repo.quotaUsedBytes == 100
repo.quotaReservedBytes == 100
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
repo.totalUsed == 200'nb
repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 100'nb
test "Should not reserve bytes over max quota":
let blk = createTestBlock(100)
check repo.totalUsed == 0
check repo.totalUsed == 0'nb
(await repo.putBlock(blk)).tryGet
check repo.totalUsed == 100
check repo.totalUsed == 100'nb
expect QuotaNotEnoughError:
(await repo.reserve(101)).tryGet
(await repo.reserve(101'nb)).tryGet
check:
repo.totalUsed == 100
repo.quotaUsedBytes == 100
repo.quotaReservedBytes == 0
expect DatastoreKeyNotFound:
discard (await metaDs.get(QuotaReservedKey)).tryGet
repo.totalUsed == 100'nb
repo.quotaUsedBytes == 100'nb
repo.quotaReservedBytes == 0'nb
test "Should release bytes":
discard createTestBlock(100)
check repo.totalUsed == 0
(await repo.reserve(100)).tryGet
check repo.totalUsed == 100
check repo.totalUsed == 0'nb
(await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100'nb
(await repo.release(100)).tryGet
(await repo.release(100'nb)).tryGet
check:
repo.totalUsed == 0
repo.quotaUsedBytes == 0
repo.quotaReservedBytes == 0
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
repo.totalUsed == 0'nb
repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 0'nb
test "Should not release bytes less than quota":
check repo.totalUsed == 0
(await repo.reserve(100)).tryGet
check repo.totalUsed == 100
check repo.totalUsed == 0'nb
(await repo.reserve(100'nb)).tryGet
check repo.totalUsed == 100'nb
expect CatchableError:
(await repo.release(101)).tryGet
expect RangeDefect:
(await repo.release(101'nb)).tryGet
check:
repo.totalUsed == 100
repo.quotaUsedBytes == 0
repo.quotaReservedBytes == 100
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
repo.totalUsed == 100'nb
repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 100'nb
proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} =
let
query = Query.init(key)
responseIter = (await metaDs.query(query)).tryGet
response = (await allFinished(toSeq(responseIter)))
.mapIt(it.read.tryGet)
.filterIt(it.key.isSome)
return response
proc getExpirations(): Future[seq[BlockExpiration]] {.async.} =
let iter = (await repo.getBlockExpirations(100, 0)).tryGet()
var res = newSeq[BlockExpiration]()
for fut in iter:
let be = await fut
res.add(be)
res
test "Should store block expiration timestamp":
let
@ -203,49 +195,40 @@ asyncchecksuite "RepoStore":
blk = createTestBlock(100)
let
expectedExpiration: SecondsSince1970 = 123 + 10
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, duration.some)).tryGet
let response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in expirations
test "Should store block with default expiration timestamp when not provided":
let
blk = createTestBlock(100)
let
expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds)
(await repo.putBlock(blk)).tryGet
let response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in expirations
test "Should refuse update expiry with negative timestamp":
let
blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + 10
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in expirations
expect ValueError:
(await repo.ensureExpiry(blk.cid, -1)).tryGet
@ -262,56 +245,45 @@ asyncchecksuite "RepoStore":
test "Should update block expiration timestamp when new expiration is farther":
let
duration = 10
blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20)
(await repo.putBlock(blk, some duration.seconds)).tryGet
(await repo.putBlock(blk, some 10.seconds)).tryGet
var response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
(await repo.ensureExpiry(blk.cid, now + 20)).tryGet
response = await queryMetaDs(expectedKey)
let updatedExpirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == updatedExpectedExpiration.toBytes
expectedExpiration notin updatedExpirations
updatedExpectedExpiration in updatedExpirations
test "Should not update block expiration timestamp when current expiration is farther then new one":
let
duration = 10
blk = createTestBlock(100)
expectedExpiration: SecondsSince1970 = now + duration
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5)
(await repo.putBlock(blk, some 10.seconds)).tryGet
(await repo.putBlock(blk, some duration.seconds)).tryGet
var response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
(await repo.ensureExpiry(blk.cid, now + 5)).tryGet
response = await queryMetaDs(expectedKey)
let updatedExpirations = await getExpirations()
check:
response.len == 1
!response[0].key == expectedKey
response[0].data == expectedExpiration.toBytes
expectedExpiration in updatedExpirations
updatedExpectedExpiration notin updatedExpirations
test "delBlock should remove expiration metadata":
let
@ -321,19 +293,19 @@ asyncchecksuite "RepoStore":
(await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet
let response = await queryMetaDs(expectedKey)
let expirations = await getExpirations()
check:
response.len == 0
expirations.len == 0
test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations
for be in toSeq(iter):
if value =? (await be):
expirations.add(value)
for beFut in toSeq(iter):
let value = await beFut
expirations.add(value)
return expirations
let
@ -343,12 +315,12 @@ asyncchecksuite "RepoStore":
blk3 = createTestBlock(12)
let
expectedExpiration: SecondsSince1970 = 123 + 10
expectedExpiration: SecondsSince1970 = now + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check:
be.cid == expectedBlock.cid
be.expiration == expectedExpiration
be.expiry == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet

View File

@ -58,6 +58,14 @@ suite "Indexing strategies":
expect IndexingWrongIterationsError:
discard LinearStrategy.init(0, 10, 0)
test "should split elements evenly when possible":
let
l = LinearStrategy.init(0, 11, 3)
check:
toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it)
toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it)
toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it)
test "linear - oob":
expect IndexingError:
discard linear.getIndicies(3)

View File

@ -74,3 +74,36 @@ checksuite "Manifest":
test "Should encode/decode to/from verifiable manifest":
check:
encodeDecode(verifiableManifest) == verifiableManifest
suite "Manifest - Attribute Inheritance":
proc makeProtectedManifest(strategy: StrategyType): Manifest =
Manifest.new(
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 1.MiBs,
datasetSize = 100.MiBs,
),
treeCid = Cid.example,
datasetSize = 200.MiBs,
ecK = 1,
ecM = 1,
strategy = strategy
)
test "Should preserve interleaving strategy for protected manifest in verifiable manifest":
var verifiable = Manifest.new(
manifest = makeProtectedManifest(SteppedStrategy),
verifyRoot = Cid.example,
slotRoots = @[Cid.example, Cid.example]
).tryGet()
check verifiable.protectedStrategy == SteppedStrategy
verifiable = Manifest.new(
manifest = makeProtectedManifest(LinearStrategy),
verifyRoot = Cid.example,
slotRoots = @[Cid.example, Cid.example]
).tryGet()
check verifiable.protectedStrategy == LinearStrategy

View File

@ -1,12 +1,15 @@
import pkg/chronos
import pkg/questionable/results
import pkg/codex/streams
import pkg/codex/stores
import pkg/codex/manifest
import pkg/codex/blocktype as bt
import pkg/codex/[
streams,
stores,
indexingstrategy,
manifest,
blocktype as bt]
import ../asynctest
import ./examples
import ./helpers
asyncchecksuite "StoreStream":
@ -99,3 +102,40 @@ asyncchecksuite "StoreStream":
await stream.readExactly(addr buf[0], 15)
check sequentialBytes(buf,15,0)
suite "StoreStream - Size Tests":
var stream: StoreStream
teardown:
await stream.close()
test "Should return dataset size as stream size":
let manifest = Manifest.new(
treeCid = Cid.example,
datasetSize = 80.NBytes,
blockSize = 10.NBytes
)
stream = StoreStream.new(CacheStore.new(), manifest)
check stream.size == 80
test "Should not count parity/padding bytes as part of stream size":
let protectedManifest = Manifest.new(
treeCid = Cid.example,
datasetSize = 120.NBytes, # size including parity bytes
blockSize = 10.NBytes,
version = CIDv1,
hcodec = Sha256HashCodec,
codec = BlockCodec,
ecK = 2,
ecM = 1,
originalTreeCid = Cid.example,
originalDatasetSize = 80.NBytes, # size without parity bytes
strategy = StrategyType.SteppedStrategy
)
stream = StoreStream.new(CacheStore.new(), protectedManifest)
check stream.size == 80

View File

@ -1,5 +1,6 @@
import std/sequtils
from pkg/libp2p import `==`
import pkg/codex/units
import ./twonodes
twonodessuite "REST API", debug1 = false, debug2 = false:
@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false:
discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get
let space = client1.space().tryGet()
check:
space.totalBlocks == 2.uint
space.quotaMaxBytes == 8589934592.uint
space.quotaUsedBytes == 65592.uint
space.quotaReservedBytes == 12.uint
space.totalBlocks == 2
space.quotaMaxBytes == 8589934592.NBytes
space.quotaUsedBytes == 65592.NBytes
space.quotaReservedBytes == 12.NBytes
test "node lists local files":
let content1 = "some file contents"

View File

@ -1,3 +0,0 @@
deps=""
resolver="MaxVer"
overrides="urls.rules"