diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fb79c510..4754e3ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: env: cache_nonce: 0 # Allows for easily busting actions/cache caches - nim_version: v1.6.14 + nim_version: pinned concurrency: diff --git a/.github/workflows/nim-matrix.yml b/.github/workflows/nim-matrix.yml index 9328c8af..0b57c8bf 100644 --- a/.github/workflows/nim-matrix.yml +++ b/.github/workflows/nim-matrix.yml @@ -6,7 +6,7 @@ on: env: cache_nonce: 0 # Allows for easily busting actions/cache caches - nim_version: v1.6.14, v1.6.16, v1.6.18 + nim_version: pinned, v1.6.16, v1.6.18 jobs: matrix: diff --git a/Makefile b/Makefile index e7c224f9..2e1f97ad 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,30 @@ # at your option. This file may not be copied, modified, or distributed except # according to those terms. +# This is the Nim version used locally and in regular CI builds. +# Can be a specific version tag, a branch name, or a commit hash. +# Can be overridden by setting the NIM_COMMIT environment variable +# before calling make. +# +# For readability in CI, if NIM_COMMIT is set to "pinned", +# this will also default to the version pinned here. +# +# If NIM_COMMIT is set to "nimbusbuild", this will use the +# version pinned by nimbus-build-system. +PINNED_NIM_VERSION := v1.6.14 + +ifeq ($(NIM_COMMIT),) +NIM_COMMIT := $(PINNED_NIM_VERSION) +else ifeq ($(NIM_COMMIT),pinned) +NIM_COMMIT := $(PINNED_NIM_VERSION) +endif + +ifeq ($(NIM_COMMIT),nimbusbuild) +undefine NIM_COMMIT +else +export NIM_COMMIT +endif + SHELL := bash # the shell used internally by Make # used inside the included makefiles diff --git a/atlas.lock b/atlas.lock deleted file mode 100644 index 497023cf..00000000 --- a/atlas.lock +++ /dev/null @@ -1,209 +0,0 @@ -{ - "clangVersion": "", - "gccVersion": "", - "hostCPU": "arm64", - "hostOS": "macosx", - "items": { - "asynctest": { - "commit": "8e2f4e73b97123be0f0041c129942b32df23ecb1", - "dir": "vendor/asynctest", - "url": "https://github.com/codex-storage/asynctest" - }, - "dnsclient.nim": { - "commit": "23214235d4784d24aceed99bbfe153379ea557c8", - "dir": "vendor/dnsclient.nim", - "url": "https://github.com/ba0f3/dnsclient.nim" - }, - "lrucache.nim": { - "commit": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd", - "dir": "vendor/lrucache.nim", - "url": "https://github.com/status-im/lrucache.nim" - }, - "nim-bearssl": { - "commit": "99fcb3405c55b27cfffbf60f5368c55da7346f23", - "dir": "vendor/nim-bearssl", - "url": "https://github.com/status-im/nim-bearssl" - }, - "nim-blscurve": { - "commit": "48d8668c5a9a350d3a7ee0c3713ef9a11980a40d", - "dir": "vendor/nim-blscurve", - "url": "https://github.com/status-im/nim-blscurve" - }, - "nim-chronicles": { - "commit": "c9c8e58ec3f89b655a046c485f622f9021c68b61", - "dir": "vendor/nim-chronicles", - "url": "https://github.com/status-im/nim-chronicles" - }, - "nim-chronos": { - "commit": "0277b65be2c7a365ac13df002fba6e172be55537", - "dir": "vendor/nim-chronos", - "url": "https://github.com/status-im/nim-chronos" - }, - "nim-confutils": { - "commit": "2028b41602b3abf7c9bf450744efde7b296707a2", - "dir": "vendor/nim-confutils", - "url": "https://github.com/status-im/nim-confutils" - }, - "nim-contract-abi": { - "commit": "61f8f59b3917d8e27c6eb4330a6d8cf428e98b2d", - "dir": "vendor/nim-contract-abi", - "url": "https://github.com/status-im/nim-contract-abi" - }, - "nim-datastore": { - "commit": "0cde8aeb67c59fd0ac95496dc6b5e1168d6632aa", - "dir": "vendor/nim-datastore", - "url": "https://github.com/codex-storage/nim-datastore" - }, - "nim-faststreams": { - "commit": "720fc5e5c8e428d9d0af618e1e27c44b42350309", - "dir": "vendor/nim-faststreams", - "url": "https://github.com/status-im/nim-faststreams" - }, - "nim-http-utils": { - "commit": "3b491a40c60aad9e8d3407443f46f62511e63b18", - "dir": "vendor/nim-http-utils", - "url": "https://github.com/status-im/nim-http-utils" - }, - "nim-json-rpc": { - "commit": "0bf2bcbe74a18a3c7a709d57108bb7b51e748a92", - "dir": "vendor/nim-json-rpc", - "url": "https://github.com/status-im/nim-json-rpc" - }, - "nim-json-serialization": { - "commit": "bb53d49caf2a6c6cf1df365ba84af93cdcfa7aa3", - "dir": "vendor/nim-json-serialization", - "url": "https://github.com/status-im/nim-json-serialization" - }, - "nim-leopard": { - "commit": "1a6f2ab7252426a6ac01482a68b75d0c3b134cf0", - "dir": "vendor/nim-leopard", - "url": "https://github.com/status-im/nim-leopard" - }, - "nim-libbacktrace": { - "commit": "b29c22ba0ef13de50b779c776830dbea1d50cd33", - "dir": "vendor/nim-libbacktrace", - "url": "https://github.com/status-im/nim-libbacktrace" - }, - "nim-libp2p": { - "commit": "440461b24b9e66542b34d26a0b908c17f6549d05", - "dir": "vendor/nim-libp2p", - "url": "https://github.com/status-im/nim-libp2p" - }, - "nim-libp2p-dht": { - "commit": "fdd02450aa6979add7dabd29a3ba0f8738bf89f8", - "dir": "vendor/nim-libp2p-dht", - "url": "https://github.com/status-im/nim-libp2p-dht" - }, - "nim-metrics": { - "commit": "6142e433fc8ea9b73379770a788017ac528d46ff", - "dir": "vendor/nim-metrics", - "url": "https://github.com/status-im/nim-metrics" - }, - "nim-nat-traversal": { - "commit": "27d314d65c9078924b3239fe4e2f5af0c512b28c", - "dir": "vendor/nim-nat-traversal", - "url": "https://github.com/status-im/nim-nat-traversal" - }, - "nim-nitro": { - "commit": "6b4c455bf4dad7449c1580055733a1738fcd5aec", - "dir": "vendor/nim-nitro", - "url": "https://github.com/status-im/nim-nitro" - }, - "nim-presto": { - "commit": "3984431dc0fc829eb668e12e57e90542b041d298", - "dir": "vendor/nim-presto", - "url": "https://github.com/status-im/nim-presto" - }, - "nim-protobuf-serialization": { - "commit": "28214b3e40c755a9886d2ec8f261ec48fbb6bec6", - "dir": "vendor/nim-protobuf-serialization", - "url": "https://github.com/status-im/nim-protobuf-serialization" - }, - "nim-results": { - "commit": "f3c666a272c69d70cb41e7245e7f6844797303ad", - "dir": "vendor/nim-results", - "url": "https://github.com/arnetheduck/nim-results" - }, - "nim-secp256k1": { - "commit": "2acbbdcc0e63002a013fff49f015708522875832", - "dir": "vendor/nim-secp256k1", - "url": "https://github.com/status-im/nim-secp256k1" - }, - "nim-serialization": { - "commit": "384eb2561ee755446cff512a8e057325848b86a7", - "dir": "vendor/nim-serialization", - "url": "https://github.com/status-im/nim-serialization" - }, - "nim-sqlite3-abi": { - "commit": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3", - "dir": "vendor/nim-sqlite3-abi", - "url": "https://github.com/arnetheduck/nim-sqlite3-abi" - }, - "nim-stew": { - "commit": "7afe7e3c070758cac1f628e4330109f3ef6fc853", - "dir": "vendor/nim-stew", - "url": "https://github.com/status-im/nim-stew" - }, - "nim-taskpools": { - "commit": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a", - "dir": "vendor/nim-taskpools", - "url": "https://github.com/status-im/nim-taskpools" - }, - "nim-testutils": { - "commit": "b56a5953e37fc5117bd6ea6dfa18418c5e112815", - "dir": "vendor/nim-testutils", - "url": "https://github.com/status-im/nim-testutils" - }, - "nim-toml-serialization": { - "commit": "86d477136f105f04bfd0dd7c0e939593d81fc581", - "dir": "vendor/nim-toml-serialization", - "url": "https://github.com/status-im/nim-toml-serialization" - }, - "nim-unittest2": { - "commit": "b178f47527074964f76c395ad0dfc81cf118f379", - "dir": "vendor/nim-unittest2", - "url": "https://github.com/status-im/nim-unittest2" - }, - "nim-websock": { - "commit": "2c3ae3137f3c9cb48134285bd4a47186fa51f0e8", - "dir": "vendor/nim-websock", - "url": "https://github.com/status-im/nim-websock" - }, - "nim-zlib": { - "commit": "f34ca261efd90f118dc1647beefd2f7a69b05d93", - "dir": "vendor/nim-zlib", - "url": "https://github.com/status-im/nim-zlib" - }, - "nim-stint": { - "dir": "vendor/stint", - "url": "https://github.com/status-im/nim-stint", - "commit": "86621eced1dcfb5e25903019ebcfc76ed9128ec5" - }, - "nimcrypto": { - "commit": "24e006df85927f64916e60511620583b11403178", - "dir": "vendor/nimcrypto", - "url": "https://github.com/status-im/nimcrypto" - }, - "npeg": { - "commit": "b15a10e388b91b898c581dbbcb6a718d46b27d2f", - "dir": "vendor/npeg", - "url": "https://github.com/zevv/npeg" - }, - "questionable": { - "commit": "47692e0d923ada8f7f731275b2a87614c0150987", - "dir": "vendor/questionable", - "url": "https://github.com/codex-storage/questionable" - }, - "upraises": { - "commit": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2", - "dir": "vendor/upraises", - "url": "https://github.com/markspanbroek/upraises" - } - }, - "nimVersion": "1.6.14", - "nimbleFile": { - "content": "# Package\n\nversion = \"0.3.2\"\nauthor = \"Status Research & Development GmbH\"\ndescription = \"DHT based on the libp2p Kademlia spec\"\nlicense = \"MIT\"\nskipDirs = @[\"tests\"]\n\n\n# Dependencies\nrequires \"nim >= 1.2.0\"\nrequires \"secp256k1#2acbbdcc0e63002a013fff49f015708522875832\" # >= 0.5.2 & < 0.6.0\nrequires \"protobuf_serialization\" # >= 0.2.0 & < 0.3.0\nrequires \"nimcrypto == 0.5.4\"\nrequires \"bearssl#head\"\nrequires \"chronicles >= 0.10.2 & < 0.11.0\"\nrequires \"chronos == 3.2.0\" # >= 3.0.11 & < 3.1.0\nrequires \"libp2p#unstable\"\nrequires \"metrics\"\nrequires \"stew#head\"\nrequires \"stint\"\nrequires \"asynctest >= 0.3.1 & < 0.4.0\"\nrequires \"https://github.com/codex-storage/nim-datastore#head\"\nrequires \"questionable\"\n\ninclude \"build.nims\"\n\n", - "filename": "" - }, - "nimcfg": "############# begin Atlas config section ##########\n--noNimblePath\n--path:\"vendor/nim-secp256k1\"\n--path:\"vendor/nim-protobuf-serialization\"\n--path:\"vendor/nimcrypto\"\n--path:\"vendor/nim-bearssl\"\n--path:\"vendor/nim-chronicles\"\n--path:\"vendor/nim-chronos\"\n--path:\"vendor/nim-libp2p\"\n--path:\"vendor/nim-metrics\"\n--path:\"vendor/nim-stew\"\n--path:\"vendor/nim-stint\"\n--path:\"vendor/asynctest\"\n--path:\"vendor/nim-datastore\"\n--path:\"vendor/questionable\"\n--path:\"vendor/nim-faststreams\"\n--path:\"vendor/nim-serialization\"\n--path:\"vendor/npeg/src\"\n--path:\"vendor/nim-unittest2\"\n--path:\"vendor/nim-testutils\"\n--path:\"vendor/nim-json-serialization\"\n--path:\"vendor/nim-http-utils\"\n--path:\"vendor/dnsclient.nim/src\"\n--path:\"vendor/nim-websock\"\n--path:\"vendor/nim-results\"\n--path:\"vendor/nim-sqlite3-abi\"\n--path:\"vendor/upraises\"\n--path:\"vendor/nim-zlib\"\n############# end Atlas config section ##########\n" -} diff --git a/codex.nimble b/codex.nimble index 7a99779f..f3033861 100644 --- a/codex.nimble +++ b/codex.nimble @@ -6,31 +6,4 @@ binDir = "build" srcDir = "." installFiles = @["build.nims"] -requires "nim >= 1.2.0" -requires "asynctest >= 0.5.1 & < 0.6.0" -requires "bearssl >= 0.1.4" -requires "chronicles >= 0.7.2" -requires "chronos >= 2.5.2" -requires "confutils" -requires "ethers >= 0.7.3 & < 0.8.0" -requires "libbacktrace" -requires "libp2p" -requires "metrics" -requires "nimcrypto >= 0.4.1" -requires "nitro >= 0.5.1 & < 0.6.0" -requires "presto" -requires "protobuf_serialization >= 0.2.0 & < 0.3.0" -requires "questionable >= 0.10.13 & < 0.11.0" -requires "secp256k1" -requires "serde >= 1.0.0 & < 2.0.0" -requires "stew" -requires "upraises >= 0.1.0 & < 0.2.0" -requires "toml_serialization" -requires "https://github.com/status-im/lrucache.nim#1.2.2" -requires "leopard >= 0.1.0 & < 0.2.0" -requires "blscurve" -requires "libp2pdht" -requires "eth" -requires "https://github.com/codex-storage/nim-poseidon2.git >= 0.1.0 & < 0.2.0" - include "build.nims" diff --git a/codex/codex.nim b/codex/codex.nim index 4ca6d63d..0b9182fb 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -258,7 +258,7 @@ proc new*( repoDs = repoData, metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create metadata store!"), - quotaMaxBytes = config.storageQuota.uint, + quotaMaxBytes = config.storageQuota, blockTtl = config.blockTtl) maintenance = BlockMaintainer.new( @@ -312,7 +312,7 @@ proc new*( taskpool = taskpool) restServer = RestServerRef.new( - codexNode.initRestApi(config, repoStore), + codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), initTAddress(config.apiBindAddress , config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high) diff --git a/codex/conf.nim b/codex/conf.nim index d3f3e5f5..a09c9d84 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -191,6 +191,12 @@ type name: "api-port" abbr: "p" }: Port + apiCorsAllowedOrigin* {. + desc: "The REST Api CORS allowed origin for downloading data. '*' will allow all origins, '' will allow none.", + defaultValue: string.none + defaultValueDesc: "Disallow all cross origin requests to download data" + name: "api-cors-origin" }: Option[string] + repoKind* {. desc: "Backend for main repo store (fs, sqlite, leveldb)" defaultValueDesc: "fs" diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index 27444522..d8eeba58 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -49,8 +49,8 @@ func getLinearIndicies( self.checkIteration(iteration) let - first = self.firstIndex + iteration * (self.step + 1) - last = min(first + self.step, self.lastIndex) + first = self.firstIndex + iteration * self.step + last = min(first + self.step - 1, self.lastIndex) getIter(first, last, 1) @@ -94,4 +94,4 @@ func init*( firstIndex: firstIndex, lastIndex: lastIndex, iterations: iterations, - step: divUp((lastIndex - firstIndex), iterations)) + step: divUp((lastIndex - firstIndex + 1), iterations)) diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 486a8fc3..abfd5b4c 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -135,13 +135,6 @@ func isManifest*(mc: MultiCodec): ?!bool = # Various sizes and verification ############################################################ -func bytes*(self: Manifest, pad = true): NBytes = - ## Compute how many bytes corresponding StoreStream(Manifest, pad) will return - if pad or self.protected: - self.blocksCount.NBytes * self.blockSize - else: - self.datasetSize - func rounded*(self: Manifest): int = ## Number of data blocks in *protected* manifest including padding at the end roundUp(self.originalBlocksCount, self.ecK) @@ -238,7 +231,7 @@ func new*( treeCid: Cid, datasetSize: NBytes, ecK, ecM: int, - strategy: StrategyType): Manifest = + strategy = SteppedStrategy): Manifest = ## Create an erasure protected dataset from an ## unprotected one ## @@ -284,7 +277,7 @@ func new*( ecM: int, originalTreeCid: Cid, originalDatasetSize: NBytes, - strategy: StrategyType): Manifest = + strategy = SteppedStrategy): Manifest = Manifest( treeCid: treeCid, @@ -306,7 +299,7 @@ func new*( verifyRoot: Cid, slotRoots: openArray[Cid], cellSize = DefaultCellSize, - strategy = SteppedStrategy): ?!Manifest = + strategy = LinearStrategy): ?!Manifest = ## Create a verifiable dataset from an ## protected one ## @@ -331,6 +324,7 @@ func new*( ecM: manifest.ecM, originalTreeCid: manifest.treeCid, originalDatasetSize: manifest.originalDatasetSize, + protectedStrategy: manifest.protectedStrategy, verifiable: true, verifyRoot: verifyRoot, slotRoots: @slotRoots, diff --git a/codex/merkletree/codex/coders.nim b/codex/merkletree/codex/coders.nim index 62e4f75b..a2d5a24b 100644 --- a/codex/merkletree/codex/coders.nim +++ b/codex/merkletree/codex/coders.nim @@ -14,6 +14,8 @@ push: {.upraises: [].} import pkg/libp2p import pkg/questionable import pkg/questionable/results +import pkg/stew/byteutils +import pkg/serde/json import ../../units import ../../errors @@ -100,3 +102,18 @@ proc decode*(_: type CodexProof, data: seq[byte]): ?!CodexProof = nodes.add node CodexProof.init(mcodec, index.int, nleaves.int, nodes) + +proc fromJson*( + _: type CodexProof, + json: JsonNode +): ?!CodexProof = + expectJsonKind(Cid, JString, json) + var bytes: seq[byte] + try: + bytes = hexToSeqByte(json.str) + except ValueError as err: + return failure(err) + + CodexProof.decode(bytes) + +func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode()) diff --git a/codex/node.nim b/codex/node.nim index dd1ea1c4..f1fad5d8 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -560,7 +560,9 @@ proc onStore( trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) - without builder =? Poseidon2Builder.new(self.networkStore, manifest), err: + without builder =? Poseidon2Builder.new( + self.networkStore, manifest, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -585,8 +587,8 @@ proc onStore( return success() - without indexer =? manifest.protectedStrategy.init( - 0, manifest.numSlotBlocks() - 1, manifest.numSlots).catch, err: + without indexer =? manifest.verifiableStrategy.init( + 0, manifest.blocksCount - 1, manifest.numSlots).catch, err: trace "Unable to create indexing strategy from protected manifest", err = err.msg return failure(err) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 8405d917..a453985b 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -107,6 +107,8 @@ proc retrieveCid( await stream.close() proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) = + let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion + router.rawApi( MethodPost, "/api/codex/v1/data") do ( @@ -166,6 +168,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute Http400, $cid.error()) + if corsOrigin =? allowedOrigin: + resp.setHeader("Access-Control-Allow-Origin", corsOrigin) + resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS") + resp.setHeader("Access-Control-Headers", "X-Requested-With") + resp.setHeader("Access-Control-Max-Age", "86400") + await node.retrieveCid(cid.get(), local = true, resp=resp) router.api( @@ -181,6 +189,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute Http400, $cid.error()) + if corsOrigin =? allowedOrigin: + resp.setHeader("Access-Control-Allow-Origin", corsOrigin) + resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS") + resp.setHeader("Access-Control-Headers", "X-Requested-With") + resp.setHeader("Access-Control-Max-Age", "86400") + await node.retrieveCid(cid.get(), local = false, resp=resp) router.api( @@ -664,9 +678,13 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) = trace "Excepting processing request", exc = exc.msg return RestApiResponse.error(Http500) +proc initRestApi*( + node: CodexNodeRef, + conf: CodexConf, + repoStore: RepoStore, + corsAllowedOrigin: ?string): RestRouter = -proc initRestApi*(node: CodexNodeRef, conf: CodexConf, repoStore: RepoStore): RestRouter = - var router = RestRouter.init(validate) + var router = RestRouter.init(validate, corsAllowedOrigin) initDataApi(node, repoStore, router) initSalesApi(node, router) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 142af9d2..fba708be 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -7,6 +7,7 @@ import ../sales import ../purchasing import ../utils/json import ../manifest +import ../units export json @@ -65,10 +66,10 @@ type id*: NodeId RestRepoStore* = object - totalBlocks* {.serialize.}: uint - quotaMaxBytes* {.serialize.}: uint - quotaUsedBytes* {.serialize.}: uint - quotaReservedBytes* {.serialize.}: uint + totalBlocks* {.serialize.}: Natural + quotaMaxBytes* {.serialize.}: NBytes + quotaUsedBytes* {.serialize.}: NBytes + quotaReservedBytes* {.serialize.}: NBytes proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList = RestContentList( diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 8ba762ea..0b5eaaf5 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -46,6 +46,7 @@ import ../stores import ../market import ../contracts/requests import ../utils/json +import ../units export requests export logutils @@ -53,6 +54,7 @@ export logutils logScope: topics = "sales reservations" + type AvailabilityId* = distinct array[32, byte] ReservationId* = distinct array[32, byte] @@ -71,7 +73,8 @@ type size* {.serialize.}: UInt256 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: UInt256 - Reservations* = ref object + Reservations* = ref object of RootObj + availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} @@ -95,12 +98,22 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc hash*(x: AvailabilityId): Hash {.borrow.} proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} +template withLock(lock, body) = + try: + await lock.acquire() + body + finally: + if lock.locked: + lock.release() + + proc new*(T: type Reservations, repo: RepoStore): Reservations = - T(repo: repo) + T(availabilityLock: newAsyncLock(),repo: repo) proc init*( _: type Availability, @@ -166,16 +179,16 @@ func key*(availability: Availability): ?!Key = func key*(reservation: Reservation): ?!Key = return key(reservation.id, reservation.availabilityId) -func available*(self: Reservations): uint = self.repo.available +func available*(self: Reservations): uint = self.repo.available.uint func hasAvailable*(self: Reservations, bytes: uint): bool = - self.repo.available(bytes) + self.repo.available(bytes.NBytes) proc exists*( self: Reservations, key: Key): Future[bool] {.async.} = - let exists = await self.repo.metaDs.contains(key) + let exists = await self.repo.metaDs.ds.contains(key) return exists proc getImpl( @@ -186,7 +199,7 @@ proc getImpl( let err = newException(NotExistsError, "object with key " & $key & " does not exist") return failure(err) - without serialized =? await self.repo.metaDs.get(key), error: + without serialized =? await self.repo.metaDs.ds.get(key), error: return failure(error.toErr(GetFailedError)) return success serialized @@ -213,7 +226,7 @@ proc updateImpl( without key =? obj.key, error: return failure(error) - if err =? (await self.repo.metaDs.put( + if err =? (await self.repo.metaDs.ds.put( key, @(obj.toJson.toBytes) )).errorOption: @@ -221,20 +234,19 @@ proc updateImpl( return success() -proc update*( - self: Reservations, - obj: Reservation): Future[?!void] {.async.} = - return await self.updateImpl(obj) - -proc update*( +proc updateAvailability( self: Reservations, obj: Availability): Future[?!void] {.async.} = + logScope: + availabilityId = obj.id + without key =? obj.key, error: return failure(error) without oldAvailability =? await self.get(key, Availability), err: if err of NotExistsError: + trace "Creating new Availability" let res = await self.updateImpl(obj) # inform subscribers that Availability has been added if onAvailabilityAdded =? self.onAvailabilityAdded: @@ -248,20 +260,20 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res else: return failure(err) # Sizing of the availability changed, we need to adjust the repo reservation accordingly if oldAvailability.totalSize != obj.totalSize: + trace "totalSize changed, updating repo reservation" if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: + if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes)).errorOption: return failure(reserveErr.toErr(ReleaseFailedError)) let res = await self.updateImpl(obj) @@ -280,11 +292,21 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res +proc update*( + self: Reservations, + obj: Reservation): Future[?!void] {.async.} = + return await self.updateImpl(obj) + +proc update*( + self: Reservations, + obj: Availability): Future[?!void] {.async.} = + withLock(self.availabilityLock): + return await self.updateAvailability(obj) + proc delete( self: Reservations, key: Key): Future[?!void] {.async.} = @@ -294,7 +316,7 @@ proc delete( if not await self.exists(key): return success() - if err =? (await self.repo.metaDs.delete(key)).errorOption: + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: return failure(err.toErr(DeleteFailedError)) return success() @@ -312,31 +334,32 @@ proc deleteReservation*( without key =? key(reservationId, availabilityId), error: return failure(error) - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) - if reservation.size > 0.u256: - trace "returning remaining reservation bytes to availability", - size = reservation.size + if reservation.size > 0.u256: + trace "returning remaining reservation bytes to availability", + size = reservation.size - without availabilityKey =? availabilityId.key, error: - return failure(error) + without availabilityKey =? availabilityId.key, error: + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - availability.freeSize += reservation.size + availability.freeSize += reservation.size - if updateErr =? (await self.update(availability)).errorOption: - return failure(updateErr) + if updateErr =? (await self.updateAvailability(availability)).errorOption: + return failure(updateErr) - if err =? (await self.repo.metaDs.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -355,14 +378,14 @@ proc createAvailability*( ) let bytes = availability.freeSize.truncate(uint) - if reserveErr =? (await self.repo.reserve(bytes)).errorOption: + if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) if updateErr =? (await self.update(availability)).errorOption: # rollback the reserve trace "rolling back reserve" - if rollbackErr =? (await self.repo.release(bytes)).errorOption: + if rollbackErr =? (await self.repo.release(bytes.NBytes)).errorOption: rollbackErr.parent = updateErr return failure(rollbackErr) @@ -370,54 +393,57 @@ proc createAvailability*( return success(availability) -proc createReservation*( +method createReservation*( self: Reservations, availabilityId: AvailabilityId, slotSize: UInt256, requestId: RequestId, slotIndex: UInt256 -): Future[?!Reservation] {.async.} = +): Future[?!Reservation] {.async, base.} = - trace "creating reservation", availabilityId, slotSize, requestId, slotIndex + withLock(self.availabilityLock): + without availabilityKey =? availabilityId.key, error: + return failure(error) - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + without availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - without availabilityKey =? availabilityId.key, error: - return failure(error) + # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications + if availability.freeSize < slotSize: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the total size of the Availability") + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex - if availability.freeSize < slotSize: - let error = newException( - BytesOutOfBoundsError, - "trying to reserve an amount of bytes that is greater than the total size of the Availability") - return failure(error) + let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) - if createResErr =? (await self.update(reservation)).errorOption: - return failure(createResErr) + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) - # reduce availability freeSize by the slot size, which is now accounted for in - # the newly created Reservation - availability.freeSize -= slotSize + # reduce availability freeSize by the slot size, which is now accounted for in + # the newly created Reservation + availability.freeSize -= slotSize - # update availability with reduced size - if updateErr =? (await self.update(availability)).errorOption: + # update availability with reduced size + trace "Updating availability with reduced size" + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Updating availability failed, rolling back reservation creation" - trace "rolling back reservation creation" + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) - without key =? reservation.key, keyError: - keyError.parent = updateErr - return failure(keyError) + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) - # rollback the reservation creation - if rollbackErr =? (await self.delete(key)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) + return failure(updateErr) - return failure(updateErr) - - return success(reservation) + trace "Reservation succesfully created" + return success(reservation) proc returnBytesToAvailability*( self: Reservations, @@ -429,48 +455,48 @@ proc returnBytesToAvailability*( reservationId availabilityId + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() + + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint).NBytes)).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint).NBytes)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) - if bytesToBeReturned == 0: - trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned return success() - trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.update(availability)).errorOption: - - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() - proc release*( self: Reservations, reservationId: ReservationId, @@ -497,7 +523,7 @@ proc release*( "trying to release an amount of bytes that is greater than the total size of the Reservation") return failure(error) - if releaseErr =? (await self.repo.release(bytes)).errorOption: + if releaseErr =? (await self.repo.release(bytes.NBytes)).errorOption: return failure(releaseErr.toErr(ReleaseFailedError)) reservation.size -= bytes.u256 @@ -507,7 +533,7 @@ proc release*( # rollback release if an update error encountered trace "rolling back release" - if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: + if rollbackErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: rollbackErr.parent = err return failure(rollbackErr) return failure(err) @@ -537,7 +563,7 @@ proc storables( else: raiseAssert "unknown type" - without results =? await self.repo.metaDs.query(query), error: + without results =? await self.repo.metaDs.ds.query(query), error: return failure(error) # /sales/reservations @@ -621,6 +647,7 @@ proc findAvailability*( minPrice >= availability.minPrice: trace "availability matched", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, @@ -635,8 +662,8 @@ proc findAvailability*( return some availability trace "availability did not match", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, collateral, availMaxCollateral = availability.maxCollateral - diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index 973446e2..e5a441d3 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -1,5 +1,6 @@ import pkg/questionable import pkg/questionable/results +import pkg/metrics import ../../logutils import ../../market @@ -13,6 +14,8 @@ import ./ignored import ./downloading import ./errored +declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch") + type SalePreparing* = ref object of ErrorHandlingState @@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = request.id, data.slotIndex ), error: + trace "Creation of reservation failed" + # Race condition: + # reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it. + # Should createReservation fail because there's no space, we proceed to SaleIgnored. + if error of BytesOutOfBoundsError: + # Lets monitor how often this happen and if it is often we can make it more inteligent to handle it + codex_reservations_availability_mismatch.inc() + return some State(SaleIgnored()) + return some State(SaleErrored(error: error)) + trace "Reservation created succesfully" + data.reservation = some reservation return some State(SaleDownloading()) diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 76193a53..63c6ba40 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} trace "Unable to delete block from repoStore" proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = - if be.expiration < self.clock.now: + if be.expiry < self.clock.now: await self.deleteExpiredBlock(be.cid) else: inc self.offset @@ -75,11 +75,11 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = return var numberReceived = 0 - for maybeBeFuture in iter: - if be =? await maybeBeFuture: - inc numberReceived - await self.processBlockExpiration(be) - await sleepAsync(50.millis) + for beFut in iter: + let be = await beFut + inc numberReceived + await self.processBlockExpiration(be) + await sleepAsync(1.millis) # cooperative scheduling # If we received fewer blockExpirations from the iterator than we asked for, # We're at the end of the dataset and should start from 0 next time. diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 357e9401..7c51d215 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/datastore/typedds import ../utils/asynciter -type KeyVal[T] = tuple[key: Key, value: T] +type KeyVal*[T] = tuple[key: Key, value: T] proc toAsyncIter*[T]( queryIter: QueryIter[T], diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index c129710f..5937cbfc 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -1,679 +1,5 @@ -## Nim-Codex -## Copyright (c) 2022 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. +import ./repostore/store +import ./repostore/types +import ./repostore/coders -import pkg/upraises - -push: {.upraises: [].} - -import pkg/chronos -import pkg/chronos/futures -import pkg/libp2p/[cid, multicodec, multihash] -import pkg/lrucache -import pkg/metrics -import pkg/questionable -import pkg/questionable/results -import pkg/datastore -import pkg/stew/endians2 - -import ./blockstore -import ./keyutils -import ../blocktype -import ../clock -import ../systemclock -import ../logutils -import ../merkletree -import ../utils - -export blocktype, cid - -logScope: - topics = "codex repostore" - -declareGauge(codex_repostore_blocks, "codex repostore blocks") -declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") -declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") - -const - DefaultBlockTtl* = 24.hours - DefaultQuotaBytes* = 1'u shl 33'u # ~8GB - -type - QuotaUsedError* = object of CodexError - QuotaNotEnoughError* = object of CodexError - - RepoStore* = ref object of BlockStore - postFixLen*: int - repoDs*: Datastore - metaDs*: Datastore - clock: Clock - totalBlocks*: uint # number of blocks in the store - quotaMaxBytes*: uint # maximum available bytes - quotaUsedBytes*: uint # bytes used by the repo - quotaReservedBytes*: uint # bytes reserved by the repo - blockTtl*: Duration - started*: bool - - BlockExpiration* = object - cid*: Cid - expiration*: SecondsSince1970 - -proc updateMetrics(self: RepoStore) = - codex_repostore_blocks.set(self.totalBlocks.int64) - codex_repostore_bytes_used.set(self.quotaUsedBytes.int64) - codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64) - -func totalUsed*(self: RepoStore): uint = - (self.quotaUsedBytes + self.quotaReservedBytes) - -func available*(self: RepoStore): uint = - return self.quotaMaxBytes - self.totalUsed - -func available*(self: RepoStore, bytes: uint): bool = - return bytes < self.available() - -proc encode(cidAndProof: (Cid, CodexProof)): seq[byte] = - ## Encodes a tuple of cid and merkle proof in a following format: - ## | 8-bytes | n-bytes | remaining bytes | - ## | n | cid | proof | - ## - ## where n is a size of cid - ## - let - (cid, proof) = cidAndProof - cidBytes = cid.data.buffer - proofBytes = proof.encode - n = cidBytes.len - nBytes = n.uint64.toBytesBE - - @nBytes & cidBytes & proofBytes - -proc decode(_: type (Cid, CodexProof), data: seq[byte]): ?!(Cid, CodexProof) = - let - n = uint64.fromBytesBE(data[0.. self.quotaMaxBytes: - error "Cannot store block, quota used!", used = self.totalUsed - return failure( - newException(QuotaUsedError, "Cannot store block, quota used!")) - - var - batch: seq[BatchEntry] - - let - used = self.quotaUsedBytes + blk.data.len.uint - - if err =? (await self.repoDs.put(key, blk.data)).errorOption: - error "Error storing block", err = err.msg - return failure(err) - - batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) - - without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: - warn "Unable to create block expiration metadata key", err = err.msg - return failure(err) - batch.add(blockExpEntry) - - if err =? (await self.metaDs.put(batch)).errorOption: - error "Error updating quota bytes", err = err.msg - - if err =? (await self.repoDs.delete(key)).errorOption: - error "Error deleting block after failed quota update", err = err.msg - return failure(err) - - return failure(err) - - self.quotaUsedBytes = used - inc self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - warn "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = - let used = self.quotaUsedBytes - blk.data.len.uint - if err =? (await self.metaDs.put( - QuotaUsedKey, - @(used.uint64.toBytesBE))).errorOption: - trace "Error updating quota key!", err = err.msg - return failure(err) - self.quotaUsedBytes = used - self.updateMetrics() - return success() - -proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - without key =? createBlockExpirationMetadataKey(cid), err: - return failure(err) - return await self.metaDs.delete(key) - -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore - ## - - logScope: - cid = cid - - trace "Deleting block" - - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - if blk =? (await self.getBlock(cid)): - if key =? makePrefixKey(self.postFixLen, cid) and - err =? (await self.repoDs.delete(key)).errorOption: - trace "Error deleting block!", err = err.msg - return failure(err) - - if isErr (await self.updateQuotaBytesUsed(blk)): - trace "Unable to update quote-bytes-used in metadata store" - return failure("Unable to update quote-bytes-used in metadata store") - - if isErr (await self.removeBlockExpirationEntry(blk.cid)): - trace "Unable to remove block expiration entry from metadata store" - return failure("Unable to remove block expiration entry from metadata store") - - trace "Deleted block", cid, totalUsed = self.totalUsed - - dec self.totalBlocks - if isErr (await self.persistTotalBlocksCount()): - trace "Unable to update block total metadata" - return failure("Unable to update block total metadata") - - self.updateMetrics() - return success() - -method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = - without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: - return failure(err) - - trace "Fetching proof", key - without value =? await self.metaDs.get(key), err: - if err of DatastoreKeyNotFound: - return success() - else: - return failure(err) - - without cid =? (Cid, CodexProof).decodeCid(value), err: - return failure(err) - - trace "Deleting block", cid - if err =? (await self.delBlock(cid)).errorOption: - return failure(err) - - await self.metaDs.delete(key) - -method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore - ## - - logScope: - cid = cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success true - - without key =? makePrefixKey(self.postFixLen, cid), err: - trace "Error getting key from provider", err = err.msg - return failure(err) - - return await self.repoDs.has(key) - -method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = - without cid =? await self.getCid(treeCid, index), err: - if err of BlockNotFoundError: - return success(false) - else: - return failure(err) - - await self.hasBlock(cid) - -method listBlocks*( - self: RepoStore, - blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.async.} = - ## Get the list of blocks in the RepoStore. - ## This is an intensive operation - ## - - var - iter = AsyncIter[?Cid]() - - let key = - case blockType: - of BlockType.Manifest: CodexManifestKey - of BlockType.Block: CodexBlocksKey - of BlockType.Both: CodexRepoKey - - let query = Query.init(key, value=false) - without queryIter =? (await self.repoDs.query(query)), err: - trace "Error querying cids in repo", blockType, err = err.msg - return failure(err) - - proc next(): Future[?Cid] {.async.} = - await idleAsync() - if queryIter.finished: - iter.finish - else: - if pair =? (await queryIter.next()) and cid =? pair.key: - doAssert pair.data.len == 0 - trace "Retrieved record from repo", cid - return Cid.init(cid.value).option - else: - return Cid.none - - iter.next = next - return success iter - -proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = - let queryKey = ? createBlockExpirationMetadataQueryKey() - success Query.init(queryKey, offset = offset, limit = maxNumber) - -method getBlockExpirations*( - self: RepoStore, - maxNumber: int, - offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} = - ## Get block expirations from the given RepoStore - ## - - without query =? createBlockExpirationQuery(maxNumber, offset), err: - trace "Unable to format block expirations query" - return failure(err) - - without queryIter =? (await self.metaDs.query(query)), err: - trace "Unable to execute block expirations query" - return failure(err) - - var iter = AsyncIter[?BlockExpiration]() - - proc next(): Future[?BlockExpiration] {.async.} = - if not queryIter.finished: - if pair =? (await queryIter.next()) and blockKey =? pair.key: - let expirationTimestamp = pair.data - let cidResult = Cid.init(blockKey.value) - if not cidResult.isOk: - raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error) - return BlockExpiration( - cid: cidResult.get, - expiration: expirationTimestamp.toSecondsSince1970 - ).some - else: - discard await queryIter.dispose() - iter.finish - return BlockExpiration.none - - iter.next = next - return success iter - -method close*(self: RepoStore): Future[void] {.async.} = - ## Close the blockstore, cleaning up resources managed by it. - ## For some implementations this may be a no-op - ## - - trace "Closing repostore" - - if not self.metaDs.isNil: - (await self.metaDs.close()).expect("Should meta datastore") - - if not self.repoDs.isNil: - (await self.repoDs.close()).expect("Should repo datastore") - -proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Reserve bytes - ## - - trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes - - if (self.totalUsed + bytes) > self.quotaMaxBytes: - trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes - return failure( - newException(QuotaNotEnoughError, "Not enough storage quota to reserver")) - - self.quotaReservedBytes += bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error reserving bytes", err = err.msg - - self.quotaReservedBytes += bytes - return failure(err) - - return success() - -proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = - ## Release bytes - ## - - trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes - - if (self.quotaReservedBytes.int - bytes.int) < 0: - trace "Cannot release this many bytes", - quotaReservedBytes = self.quotaReservedBytes, bytes - - return failure("Cannot release this many bytes") - - self.quotaReservedBytes -= bytes - if err =? (await self.metaDs.put( - QuotaReservedKey, - @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: - - trace "Error releasing bytes", err = err.msg - - self.quotaReservedBytes -= bytes - - return failure(err) - - trace "Released bytes", bytes - self.updateMetrics() - return success() - -proc start*(self: RepoStore): Future[void] {.async.} = - ## Start repo - ## - - if self.started: - trace "Repo already started" - return - - trace "Starting repo" - - without total =? await self.metaDs.get(CodexTotalBlocksKey), err: - if not (err of DatastoreKeyNotFound): - error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey - - if total.len > 0: - self.totalBlocks = uint64.fromBytesBE(total).uint - trace "Number of blocks in store at start", total = self.totalBlocks - - ## load current persist and cache bytes from meta ds - without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting cache bytes from datastore", - err = err.msg, key = $QuotaUsedKey - - raise newException(Defect, err.msg) - - if quotaUsedBytes.len > 0: - self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint - - notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes - - without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err: - if not (err of DatastoreKeyNotFound): - error "Error getting persist bytes from datastore", - err = err.msg, key = $QuotaReservedKey - - raise newException(Defect, err.msg) - - if quotaReservedBytes.len > 0: - self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint - - if self.quotaUsedBytes > self.quotaMaxBytes: - raiseAssert "All storage quota used, increase storage quota!" - - notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes - - self.updateMetrics() - self.started = true - -proc stop*(self: RepoStore): Future[void] {.async.} = - ## Stop repo - ## - if not self.started: - trace "Repo is not started" - return - - trace "Stopping repo" - await self.close() - - self.started = false - -func new*( - T: type RepoStore, - repoDs: Datastore, - metaDs: Datastore, - clock: Clock = SystemClock.new(), - postFixLen = 2, - quotaMaxBytes = DefaultQuotaBytes, - blockTtl = DefaultBlockTtl -): RepoStore = - ## Create new instance of a RepoStore - ## - RepoStore( - repoDs: repoDs, - metaDs: metaDs, - clock: clock, - postFixLen: postFixLen, - quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl - ) +export store, types, coders diff --git a/codex/stores/repostore/coders.nim b/codex/stores/repostore/coders.nim new file mode 100644 index 00000000..6fc78408 --- /dev/null +++ b/codex/stores/repostore/coders.nim @@ -0,0 +1,47 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. +## + +import std/sugar +import pkg/libp2p/cid +import pkg/serde/json +import pkg/stew/byteutils +import pkg/stew/endians2 + +import ./types +import ../../errors +import ../../merkletree +import ../../utils/json + +proc encode*(t: QuotaUsage): seq[byte] = t.toJson().toBytes() +proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: BlockMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type BlockMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: LeafMetadata): seq[byte] = t.toJson().toBytes() +proc decode*(T: type LeafMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: DeleteResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type DeleteResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(t: StoreResult): seq[byte] = t.toJson().toBytes() +proc decode*(T: type StoreResult, bytes: seq[byte]): ?!T = T.fromJson(bytes) + +proc encode*(i: uint64): seq[byte] = + @(i.toBytesBE) + +proc decode*(T: type uint64, bytes: seq[byte]): ?!T = + if bytes.len >= sizeof(uint64): + success(uint64.fromBytesBE(bytes)) + else: + failure("Not enough bytes to decode `uint64`") + +proc encode*(i: Natural | enum): seq[byte] = cast[uint64](i).encode +proc decode*(T: typedesc[Natural | enum], bytes: seq[byte]): ?!T = uint64.decode(bytes).map((ui: uint64) => cast[T](ui)) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim new file mode 100644 index 00000000..e000bb0a --- /dev/null +++ b/codex/stores/repostore/operations.nim @@ -0,0 +1,213 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid +import pkg/metrics +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ../blockstore +import ../keyutils +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree + +logScope: + topics = "codex repostore" + +declareGauge(codex_repostore_blocks, "codex repostore blocks") +declareGauge(codex_repostore_bytes_used, "codex repostore bytes used") +declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") + +proc putLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof): Future[?!StoreResultKind] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + await self.metaDs.modifyGet(key, + proc (maybeCurrMd: ?LeafMetadata): Future[(?LeafMetadata, StoreResultKind)] {.async.} = + var + md: LeafMetadata + res: StoreResultKind + + if currMd =? maybeCurrMd: + md = currMd + res = AlreadyInStore + else: + md = LeafMetadata(blkCid: blkCid, proof: proof) + res = Stored + + (md.some, res) + ) + +proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!LeafMetadata] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + without leafMd =? await get[LeafMetadata](self.metaDs, key), err: + if err of DatastoreKeyNotFound: + return failure(newException(BlockNotFoundError, err.msg)) + else: + return failure(err) + + success(leafMd) + +proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} = + await self.metaDs.modify(CodexTotalBlocksKey, + proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} = + let count: Natural = + if currCount =? maybeCurrCount: + currCount + plusCount - minusCount + else: + plusCount - minusCount + + self.totalBlocks = count + codex_repostore_blocks.set(count.int64) + count.some + ) + +proc updateQuotaUsage*( + self: RepoStore, + plusUsed: NBytes = 0.NBytes, + minusUsed: NBytes = 0.NBytes, + plusReserved: NBytes = 0.NBytes, + minusReserved: NBytes = 0.NBytes +): Future[?!void] {.async.} = + await self.metaDs.modify(QuotaUsedKey, + proc (maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = + var usage: QuotaUsage + + if currUsage =? maybeCurrUsage: + usage = QuotaUsage(used: currUsage.used + plusUsed - minusUsed, reserved: currUsage.reserved + plusReserved - minusReserved) + else: + usage = QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + + if usage.used + usage.reserved > self.quotaMaxBytes: + raise newException(QuotaNotEnoughError, + "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & + $usage.reserved & ", limit: " & $self.quotaMaxBytes) + else: + self.quotaUsage = usage + codex_repostore_bytes_used.set(usage.used.int64) + codex_repostore_bytes_reserved.set(usage.reserved.int64) + return usage.some + ) + +proc updateBlockMetadata*( + self: RepoStore, + cid: Cid, + plusRefCount: Natural = 0, + minusRefCount: Natural = 0, + minExpiry: SecondsSince1970 = 0 +): Future[?!void] {.async.} = + if cid.isEmpty: + return success() + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + await self.metaDs.modify(metaKey, + proc (maybeCurrBlockMd: ?BlockMetadata): Future[?BlockMetadata] {.async.} = + if currBlockMd =? maybeCurrBlockMd: + BlockMetadata( + size: currBlockMd.size, + expiry: max(currBlockMd.expiry, minExpiry), + refCount: currBlockMd.refCount + plusRefCount - minusRefCount + ).some + else: + raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found") + ) + +proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} = + if blk.isEmpty: + return success(StoreResult(kind: AlreadyInStore)) + + without metaKey =? createBlockExpirationMetadataKey(blk.cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, StoreResult)] {.async.} = + var + md: BlockMetadata + res: StoreResult + + if currMd =? maybeCurrMd: + if currMd.size == blk.data.len.NBytes: + md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount) + res = StoreResult(kind: AlreadyInStore) + + # making sure that the block acutally is stored in the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if not hasBlock: + warn "Block metadata is present, but block is absent. Restoring block.", cid = blk.cid + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + else: + raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid) + else: + md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0) + res = StoreResult(kind: Stored, used: blk.data.len.NBytes) + if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: + raise err + + (md.some, res) + ) + +proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} = + if cid.isEmpty: + return success(DeleteResult(kind: InUse)) + + without metaKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without blkKey =? makePrefixKey(self.postFixLen, cid), err: + return failure(err) + + await self.metaDs.modifyGet(metaKey, + proc (maybeCurrMd: ?BlockMetadata): Future[(?BlockMetadata, DeleteResult)] {.async.} = + var + maybeMeta: ?BlockMetadata + res: DeleteResult + + if currMd =? maybeCurrMd: + if currMd.refCount == 0 or currMd.expiry < expiryLimit: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: Deleted, released: currMd.size) + + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + else: + maybeMeta = currMd.some + res = DeleteResult(kind: InUse) + else: + maybeMeta = BlockMetadata.none + res = DeleteResult(kind: NotFound) + + # making sure that the block acutally is removed from the repoDs + without hasBlock =? await self.repoDs.has(blkKey), err: + raise err + + if hasBlock: + warn "Block metadata is absent, but block is present. Removing block.", cid + if err =? (await self.repoDs.delete(blkKey)).errorOption: + raise err + + (maybeMeta, res) + ) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim new file mode 100644 index 00000000..7d629131 --- /dev/null +++ b/codex/stores/repostore/store.nim @@ -0,0 +1,395 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronos/futures +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/[cid, multicodec] +import pkg/questionable +import pkg/questionable/results + +import ./coders +import ./types +import ./operations +import ../blockstore +import ../keyutils +import ../queryiterhelper +import ../../blocktype +import ../../clock +import ../../logutils +import ../../merkletree +import ../../utils + +export blocktype, cid + +logScope: + topics = "codex repostore" + +########################################################### +# BlockStore API +########################################################### + +method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = + ## Get a block from the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return cid.emptyBlock + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + without data =? await self.repoDs.get(key), err: + if not (err of DatastoreKeyNotFound): + trace "Error getting block from datastore", err = err.msg, key + return failure(err) + + return failure(newException(BlockNotFoundError, err.msg)) + + trace "Got block for cid", cid + return Block.new(cid, data, verify = true) + +method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + without blk =? await self.getBlock(leafMd.blkCid), err: + return failure(err) + + success((blk, leafMd.proof)) + +method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.getBlock(leafMd.blkCid) + +method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = + ## Get a block from the blockstore + ## + + if address.leaf: + self.getBlock(address.treeCid, address.index) + else: + self.getBlock(address.cid) + +method ensureExpiry*( + self: RepoStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + if expiry <= 0: + return failure(newException(ValueError, "Expiry timestamp must be larger then zero")) + + await self.updateBlockMetadata(cid, minExpiry = expiry) + +method ensureExpiry*( + self: RepoStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + await self.ensureExpiry(leafMd.blkCid, expiry) + +method putCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural, + blkCid: Cid, + proof: CodexProof +): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + treeCid = treeCid + index = index + blkCid = blkCid + + trace "Storing LeafMetadata" + + without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err: + return failure(err) + + if blkCid.mcodec == BlockCodec: + if res == Stored: + if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption: + return failure(err) + trace "Leaf metadata stored, block refCount incremented" + else: + trace "Leaf metadata already exists" + + return success() + +method getCidAndProof*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!(Cid, CodexProof)] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success((leafMd.blkCid, leafMd.proof)) + +method getCid*( + self: RepoStore, + treeCid: Cid, + index: Natural +): Future[?!Cid] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + return failure(err) + + success(leafMd.blkCid) + +method putBlock*( + self: RepoStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + logScope: + cid = blk.cid + + let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds + + without res =? await self.storeBlock(blk, expiry), err: + return failure(err) + + if res.kind == Stored: + trace "Block Stored" + if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: + # rollback changes + without delRes =? await self.tryDeleteBlock(blk.cid), err: + return failure(err) + return failure(err) + + if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: + return failure(err) + else: + trace "Block already exists" + + return success() + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + trace "Attempting to delete a block" + + without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: + return failure(err) + + if res.kind == Deleted: + trace "Block deleted" + if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: + return failure(err) + + if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + return failure(err) + elif res.kind == InUse: + trace "Block in use, refCount > 0 and not expired" + else: + trace "Block not found in store" + + return success() + +method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success() + else: + return failure(err) + + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: + if not (err of BlockNotFoundError): + return failure(err) + + await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + +method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + + logScope: + cid = cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return success true + + without key =? makePrefixKey(self.postFixLen, cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + return await self.repoDs.has(key) + +method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} = + without leafMd =? await self.getLeafMetadata(treeCid, index), err: + if err of BlockNotFoundError: + return success(false) + else: + return failure(err) + + await self.hasBlock(leafMd.blkCid) + +method listBlocks*( + self: RepoStore, + blockType = BlockType.Manifest +): Future[?!AsyncIter[?Cid]] {.async.} = + ## Get the list of blocks in the RepoStore. + ## This is an intensive operation + ## + + var + iter = AsyncIter[?Cid]() + + let key = + case blockType: + of BlockType.Manifest: CodexManifestKey + of BlockType.Block: CodexBlocksKey + of BlockType.Both: CodexRepoKey + + let query = Query.init(key, value=false) + without queryIter =? (await self.repoDs.query(query)), err: + trace "Error querying cids in repo", blockType, err = err.msg + return failure(err) + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + if queryIter.finished: + iter.finish + else: + if pair =? (await queryIter.next()) and cid =? pair.key: + doAssert pair.data.len == 0 + trace "Retrieved record from repo", cid + return Cid.init(cid.value).option + else: + return Cid.none + + iter.next = next + return success iter + +proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = + let queryKey = ? createBlockExpirationMetadataQueryKey() + success Query.init(queryKey, offset = offset, limit = maxNumber) + +method getBlockExpirations*( + self: RepoStore, + maxNumber: int, + offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = + ## Get iterator with block expirations + ## + + without beQuery =? createBlockExpirationQuery(maxNumber, offset), err: + error "Unable to format block expirations query", err = err.msg + return failure(err) + + without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err: + error "Unable to execute block expirations query", err = err.msg + return failure(err) + + without asyncQueryIter =? await queryIter.toAsyncIter(), err: + error "Unable to convert QueryIter to AsyncIter", err = err.msg + return failure(err) + + let + filteredIter = await asyncQueryIter.filterSuccess() + blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, + proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} = + without cid =? Cid.init(kv.key.value).mapFailure, err: + error "Failed decoding cid", err = err.msg + return BlockExpiration.none + + BlockExpiration(cid: cid, expiry: kv.value.expiry).some + ) + + success(blockExpIter) + +method close*(self: RepoStore): Future[void] {.async.} = + ## Close the blockstore, cleaning up resources managed by it. + ## For some implementations this may be a no-op + ## + + trace "Closing repostore" + + if not self.metaDs.isNil: + (await self.metaDs.close()).expect("Should meta datastore") + + if not self.repoDs.isNil: + (await self.repoDs.close()).expect("Should repo datastore") + +########################################################### +# RepoStore procs +########################################################### + +proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Reserve bytes + ## + + trace "Reserving bytes", bytes + + await self.updateQuotaUsage(plusReserved = bytes) + +proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = + ## Release bytes + ## + + trace "Releasing bytes", bytes + + await self.updateQuotaUsage(minusReserved = bytes) + +proc start*(self: RepoStore): Future[void] {.async.} = + ## Start repo + ## + + if self.started: + trace "Repo already started" + return + + trace "Starting rep" + if err =? (await self.updateTotalBlocksCount()).errorOption: + raise newException(CodexError, err.msg) + + if err =? (await self.updateQuotaUsage()).errorOption: + raise newException(CodexError, err.msg) + + self.started = true + +proc stop*(self: RepoStore): Future[void] {.async.} = + ## Stop repo + ## + if not self.started: + trace "Repo is not started" + return + + trace "Stopping repo" + await self.close() + + self.started = false diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim new file mode 100644 index 00000000..4338e63a --- /dev/null +++ b/codex/stores/repostore/types.nim @@ -0,0 +1,107 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/datastore +import pkg/datastore/typedds +import pkg/libp2p/cid + +import ../blockstore +import ../../clock +import ../../errors +import ../../merkletree +import ../../systemclock +import ../../units + +const + DefaultBlockTtl* = 24.hours + DefaultQuotaBytes* = 8.GiBs + +type + QuotaNotEnoughError* = object of CodexError + + RepoStore* = ref object of BlockStore + postFixLen*: int + repoDs*: Datastore + metaDs*: TypedDatastore + clock*: Clock + quotaMaxBytes*: NBytes + quotaUsage*: QuotaUsage + totalBlocks*: Natural + blockTtl*: Duration + started*: bool + + QuotaUsage* {.serialize.} = object + used*: NBytes + reserved*: NBytes + + BlockMetadata* {.serialize.} = object + expiry*: SecondsSince1970 + size*: NBytes + refCount*: Natural + + LeafMetadata* {.serialize.} = object + blkCid*: Cid + proof*: CodexProof + + BlockExpiration* {.serialize.} = object + cid*: Cid + expiry*: SecondsSince1970 + + DeleteResultKind* {.serialize.} = enum + Deleted = 0, # block removed from store + InUse = 1, # block not removed, refCount > 0 and not expired + NotFound = 2 # block not found in store + + DeleteResult* {.serialize.} = object + kind*: DeleteResultKind + released*: NBytes + + StoreResultKind* {.serialize.} = enum + Stored = 0, # new block stored + AlreadyInStore = 1 # block already in store + + StoreResult* {.serialize.} = object + kind*: StoreResultKind + used*: NBytes + +func quotaUsedBytes*(self: RepoStore): NBytes = + self.quotaUsage.used + +func quotaReservedBytes*(self: RepoStore): NBytes = + self.quotaUsage.reserved + +func totalUsed*(self: RepoStore): NBytes = + (self.quotaUsedBytes + self.quotaReservedBytes) + +func available*(self: RepoStore): NBytes = + return self.quotaMaxBytes - self.totalUsed + +func available*(self: RepoStore, bytes: NBytes): bool = + return bytes < self.available() + +func new*( + T: type RepoStore, + repoDs: Datastore, + metaDs: Datastore, + clock: Clock = SystemClock.new(), + postFixLen = 2, + quotaMaxBytes = DefaultQuotaBytes, + blockTtl = DefaultBlockTtl +): RepoStore = + ## Create new instance of a RepoStore + ## + RepoStore( + repoDs: repoDs, + metaDs: TypedDatastore.init(metaDs), + clock: clock, + postFixLen: postFixLen, + quotaMaxBytes: quotaMaxBytes, + blockTtl: blockTtl + ) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index ce89171c..8a3b1a3c 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -38,7 +38,6 @@ type StoreStream* = ref object of SeekableStream store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs - pad*: bool # Pad last block to manifest.blockSize? method initStream*(s: StoreStream) = if s.objName.len == 0: @@ -57,13 +56,15 @@ proc new*( result = StoreStream( store: store, manifest: manifest, - pad: pad, offset: 0) result.initStream() method `size`*(self: StoreStream): int = - bytes(self.manifest, self.pad).int + ## The size of a StoreStream is the size of the original dataset, without + ## padding or parity blocks. + let m = self.manifest + (if m.protected: m.originalDatasetSize else: m.datasetSize).int proc `size=`*(self: StoreStream, size: int) {.error: "Setting the size is forbidden".} = diff --git a/codex/units.nim b/codex/units.nim index 63921d7d..52f44328 100644 --- a/codex/units.nim +++ b/codex/units.nim @@ -46,9 +46,13 @@ proc `'nb`*(n: string): NBytes = parseInt(n).NBytes logutils.formatIt(NBytes): $it const - MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz + KiB = 1024.NBytes # ByteSz, 1 kibibyte = 1,024 ByteSz + MiB = KiB * 1024 # ByteSz, 1 mebibyte = 1,048,576 ByteSz + GiB = MiB * 1024 # ByteSz, 1 gibibyte = 1,073,741,824 ByteSz +proc KiBs*(v: Natural): NBytes = v.NBytes * KiB proc MiBs*(v: Natural): NBytes = v.NBytes * MiB +proc GiBs*(v: Natural): NBytes = v.NBytes * GiB func divUp*[T: NBytes](a, b : T): int = ## Division with result rounded up (rather than truncated as in 'div') diff --git a/nimble.lock b/nimble.lock deleted file mode 100644 index d001256f..00000000 --- a/nimble.lock +++ /dev/null @@ -1,481 +0,0 @@ -{ - "version": 1, - "packages": { - "stew": { - "version": "0.1.0", - "vcsRevision": "6ad35b876fb6ebe0dfee0f697af173acc47906ee", - "url": "https://github.com/status-im/nim-stew.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "46d58c4feb457f3241e3347778334e325dce5268" - } - }, - "unittest2": { - "version": "0.0.4", - "vcsRevision": "f180f596c88dfd266f746ed6f8dbebce39c824db", - "url": "https://github.com/status-im/nim-unittest2.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "fa309c41eaf6ef57895b9e603f2620a2f6e11780" - } - }, - "httputils": { - "version": "0.3.0", - "vcsRevision": "689da19e9e9cfff4ced85e2b25c6b2b5598ed079", - "url": "https://github.com/status-im/nim-http-utils.git", - "downloadMethod": "git", - "dependencies": [ - "stew" - ], - "checksums": { - "sha1": "4ad3ad68d13c50184180ab4b2eacc0bd7ed2ed44" - } - }, - "nimcrypto": { - "version": "0.5.4", - "vcsRevision": "a5742a9a214ac33f91615f3862c7b099aec43b00", - "url": "https://github.com/cheatfate/nimcrypto.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "f76c87707cd4e96355b8bb6ef27e7f8b0aac1e08" - } - }, - "questionable": { - "version": "0.10.2", - "vcsRevision": "6018fd43e033d5a5310faa45bcaa1b44049469a4", - "url": "https://github.com/status-im/questionable.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "36a6c012637c7736a390e74a7f94667bca562073" - } - }, - "upraises": { - "version": "0.1.0", - "vcsRevision": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2", - "url": "https://github.com/markspanbroek/upraises.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "a0243c8039e12d547dbb2e9c73789c16bb8bc956" - } - }, - "secp256k1": { - "version": "0.5.2", - "vcsRevision": "5340cf188168d6afcafc8023770d880f067c0b2f", - "url": "https://github.com/status-im/nim-secp256k1.git", - "downloadMethod": "git", - "dependencies": [ - "stew", - "nimcrypto" - ], - "checksums": { - "sha1": "ae9cbea4487be94a06653ffee075a7f1bd1e231e" - } - }, - "stint": { - "version": "0.0.1", - "vcsRevision": "036c71d06a6b22f8f967ba9d54afd2189c3872ca", - "url": "https://github.com/status-im/stint.git", - "downloadMethod": "git", - "dependencies": [ - "stew" - ], - "checksums": { - "sha1": "0f187a2115315ca898e5f9a30c5e506cf6057062" - } - }, - "contractabi": { - "version": "0.4.4", - "vcsRevision": "b111c27b619fc1d81fb1c6942372824a18a71960", - "url": "https://github.com/status-im/nim-contract-abi", - "downloadMethod": "git", - "dependencies": [ - "stint", - "stew", - "nimcrypto", - "questionable", - "upraises" - ], - "checksums": { - "sha1": "3ed10b11eec8fe14a81e4e58dbc41f88fd6ddf7a" - } - }, - "nitro": { - "version": "0.5.1", - "vcsRevision": "6b4c455bf4dad7449c1580055733a1738fcd5aec", - "url": "https://github.com/status-im/nim-nitro.git", - "downloadMethod": "git", - "dependencies": [ - "nimcrypto", - "questionable", - "upraises", - "contractabi", - "secp256k1", - "stint", - "stew" - ], - "checksums": { - "sha1": "19d90deaeb84b19214dc2aab28a466f0bc4a7e2e" - } - }, - "bearssl": { - "version": "0.1.5", - "vcsRevision": "32e125015ae4251675763842366380795a91b722", - "url": "https://github.com/status-im/nim-bearssl.git", - "downloadMethod": "git", - "dependencies": [ - "unittest2" - ], - "checksums": { - "sha1": "c58a61e71c49ed7c7fe7608df40d60945f7c4bad" - } - }, - "chronos": { - "version": "3.0.11", - "vcsRevision": "17fed89c99beac5a92d3668d0d3e9b0e4ac13936", - "url": "https://github.com/status-im/nim-chronos.git", - "downloadMethod": "git", - "dependencies": [ - "stew", - "bearssl", - "httputils", - "unittest2" - ], - "checksums": { - "sha1": "f6fffc87571e5f76af2a77c4ebcc0e00909ced4e" - } - }, - "testutils": { - "version": "0.4.2", - "vcsRevision": "aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2", - "url": "https://github.com/status-im/nim-testutils", - "downloadMethod": "git", - "dependencies": [ - "unittest2" - ], - "checksums": { - "sha1": "94427e0cce0e0c5841edcd3a6530b4e6b857a3cb" - } - }, - "faststreams": { - "version": "0.3.0", - "vcsRevision": "1b561a9e71b6bdad1c1cdff753418906037e9d09", - "url": "https://github.com/status-im/nim-faststreams.git", - "downloadMethod": "git", - "dependencies": [ - "stew", - "testutils", - "chronos", - "unittest2" - ], - "checksums": { - "sha1": "97edf9797924af48566a0af8267203dc21d80c77" - } - }, - "serialization": { - "version": "0.1.0", - "vcsRevision": "fcd0eadadde0ee000a63df8ab21dc4e9f015a790", - "url": "https://github.com/status-im/nim-serialization.git", - "downloadMethod": "git", - "dependencies": [ - "faststreams", - "unittest2", - "stew" - ], - "checksums": { - "sha1": "fef59519892cac70cccd81b612085caaa5e3e6cf" - } - }, - "json_serialization": { - "version": "0.1.0", - "vcsRevision": "c5f0e2465e8375dfc7aa0f56ccef67cb680bc6b0", - "url": "https://github.com/status-im/nim-json-serialization.git", - "downloadMethod": "git", - "dependencies": [ - "serialization", - "stew" - ], - "checksums": { - "sha1": "d89d79d0679a3a41b350e3ad4be56c0308cc5ec6" - } - }, - "chronicles": { - "version": "0.10.2", - "vcsRevision": "1682096306ddba8185dcfac360a8c3f952d721e4", - "url": "https://github.com/status-im/nim-chronicles.git", - "downloadMethod": "git", - "dependencies": [ - "testutils", - "json_serialization" - ], - "checksums": { - "sha1": "9a5bebb76b0f7d587a31e621d260119279e91c76" - } - }, - "presto": { - "version": "0.0.4", - "vcsRevision": "962bb588d19c7180e39f0d9f18131e75861bab20", - "url": "https://github.com/status-im/nim-presto.git", - "downloadMethod": "git", - "dependencies": [ - "chronos", - "chronicles", - "stew" - ], - "checksums": { - "sha1": "8d3e77d7ddf14606504fe86c430b1b5712aada92" - } - }, - "zlib": { - "version": "0.1.0", - "vcsRevision": "74cdeb54b21bededb5a515d36f608bc1850555a2", - "url": "https://github.com/status-im/nim-zlib", - "downloadMethod": "git", - "dependencies": [ - "stew" - ], - "checksums": { - "sha1": "01d330dc4c1924e56b1559ee73bc760e526f635c" - } - }, - "libbacktrace": { - "version": "0.0.8", - "vcsRevision": "ce966b1c469dda179b54346feaaf1a62202c984f", - "url": "https://github.com/status-im/nim-libbacktrace", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "ba7a2f3d21db894ace7bb4ebe0a5b06af995d68b" - } - }, - "dnsclient": { - "version": "0.1.2", - "vcsRevision": "fbb76f8af8a33ab818184a7d4406d9fee20993be", - "url": "https://github.com/ba0f3/dnsclient.nim.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "663239a914c814204b30dda6e0902cc0fbd0b8ee" - } - }, - "metrics": { - "version": "0.0.1", - "vcsRevision": "743f81d4f6c6ebf0ac02389f2392ff8b4235bee5", - "url": "https://github.com/status-im/nim-metrics.git", - "downloadMethod": "git", - "dependencies": [ - "chronos" - ], - "checksums": { - "sha1": "6274c7ae424b871bc21ca3a6b6713971ff6a8095" - } - }, - "asynctest": { - "version": "0.3.1", - "vcsRevision": "5347c59b4b057443a014722aa40800cd8bb95c69", - "url": "https://github.com/status-im/asynctest.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "53e0b610d13700296755a4ebe789882cae47a3b9" - } - }, - "websock": { - "version": "0.1.0", - "vcsRevision": "8a433c6ba43940b13ce56f83d79a93273ece5684", - "url": "https://github.com/status-im/nim-websock.git", - "downloadMethod": "git", - "dependencies": [ - "chronos", - "httputils", - "chronicles", - "stew", - "nimcrypto", - "bearssl", - "zlib" - ], - "checksums": { - "sha1": "1dbb4e1dd8c525c5674dca42b8eb25bdeb2f76b3" - } - }, - "libp2p": { - "version": "0.0.2", - "vcsRevision": "eeb3c210a37408716b6a8b45f578adf87610cef2", - "url": "https://github.com/status-im/nim-libp2p.git", - "downloadMethod": "git", - "dependencies": [ - "nimcrypto", - "dnsclient", - "bearssl", - "chronicles", - "chronos", - "metrics", - "secp256k1", - "stew", - "websock" - ], - "checksums": { - "sha1": "e9e9b93e6e425e47df1eea01a8e9efeac6e0fc97" - } - }, - "combparser": { - "version": "0.2.0", - "vcsRevision": "ba4464c005d7617c008e2ed2ebc1ba52feb469c6", - "url": "https://github.com/PMunch/combparser.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "a3635260961a893b88f69aac19f1b24e032a7e97" - } - }, - "protobuf_serialization": { - "version": "0.2.0", - "vcsRevision": "f7d671f877e01213494aac7903421ccdbe70616f", - "url": "https://github.com/status-im/nim-protobuf-serialization.git", - "downloadMethod": "git", - "dependencies": [ - "stew", - "faststreams", - "serialization", - "combparser" - ], - "checksums": { - "sha1": "9418459027d0d5eb30a974649dc615a76e8e4aca" - } - }, - "confutils": { - "version": "0.1.0", - "vcsRevision": "0435e67832b6bb8dfdf0ddb102903e9d820206d2", - "url": "https://github.com/status-im/nim-confutils.git", - "downloadMethod": "git", - "dependencies": [ - "stew" - ], - "checksums": { - "sha1": "1edab14b434aca6ae28e2385982fa60d623c600a" - } - }, - "news": { - "version": "0.5", - "vcsRevision": "e79420e835489132aaa412f993b565f5dd6295f4", - "url": "https://github.com/status-im/news", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "a5f1789bf650822156712fd3bdec1bf6ab4ac42e" - } - }, - "json_rpc": { - "version": "0.0.2", - "vcsRevision": "5a281760803907f4989cacf109b516381dfbbe11", - "url": "https://github.com/status-im/nim-json-rpc", - "downloadMethod": "git", - "dependencies": [ - "stew", - "nimcrypto", - "stint", - "chronos", - "httputils", - "chronicles", - "news", - "websock", - "json_serialization" - ], - "checksums": { - "sha1": "3ec28a4c9e5dcd3210e85dfcfdd0a6baf46eccbe" - } - }, - "ethers": { - "version": "0.1.7", - "vcsRevision": "270d358b869d02a4c625dde971f799db336670fb", - "url": "https://github.com/status-im/nim-ethers", - "downloadMethod": "git", - "dependencies": [ - "chronos", - "contractabi", - "questionable", - "upraises", - "json_rpc", - "stint", - "stew" - ], - "checksums": { - "sha1": "3eb78a87744d5894595f33a36b21348a59d8f1a5" - } - }, - "libp2pdht": { - "version": "0.0.1", - "vcsRevision": "9a872518d621bf8b390f88cd65617bca6aca1d2d", - "url": "https://github.com/status-im/nim-libp2p-dht.git", - "downloadMethod": "git", - "dependencies": [ - "nimcrypto", - "bearssl", - "chronicles", - "chronos", - "libp2p", - "metrics", - "protobuf_serialization", - "secp256k1", - "stew", - "stint", - "asynctest" - ], - "checksums": { - "sha1": "d97e8b751e11ccc7e059b79fb1a046d2b0d0e872" - } - }, - "lrucache": { - "version": "1.2.1", - "vcsRevision": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd", - "url": "https://github.com/status-im/lrucache.nim", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "2c4365d10029d6f6a8b92a712e9002ac3886b07d" - } - }, - "leopard": { - "version": "0.0.1", - "vcsRevision": "2a6a63923e9b95676b5ae7ff2c346be0e63e753c", - "url": "https://github.com/status-im/nim-leopard", - "downloadMethod": "git", - "dependencies": [ - "stew", - "unittest2", - "upraises" - ], - "checksums": { - "sha1": "e71db348018eab26f3059e1c03bf3088c5109cfe" - } - }, - "taskpools": { - "version": "0.0.3", - "vcsRevision": "8d408ac6cfc9c24ec8b7b65d5993e85050dcbaa9", - "url": "https://github.com/status-im/nim-taskpools.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "37bbbbb03d9b893af6980592624211ab057392c0" - } - }, - "blscurve": { - "version": "0.0.1", - "vcsRevision": "0237e4e0e914fc19359c18a66406d33bc942775c", - "url": "https://github.com/status-im/nim-blscurve", - "downloadMethod": "git", - "dependencies": [ - "nimcrypto", - "stew", - "taskpools" - ], - "checksums": { - "sha1": "65f58854ffd2098e0d0ca08f6ea0efb3c27529e0" - } - } - } -} diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index fa49f878..86f881e0 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/sugar import pkg/chronos import pkg/libp2p import pkg/questionable @@ -24,33 +25,28 @@ type testBlockExpirations*: seq[BlockExpiration] getBlockExpirationsThrows*: bool - iteratorIndex: int method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = self.delBlockCids.add(cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) - dec self.iteratorIndex return success() -method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async.} = +method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!AsyncIter[BlockExpiration]] {.async.} = if self.getBlockExpirationsThrows: raise new CatchableError self.getBeMaxNumber = maxNumber self.getBeOffset = offset - var iter = AsyncIter[?BlockExpiration]() + let + testBlockExpirationsCpy = @(self.testBlockExpirations) + limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) - self.iteratorIndex = offset - var numberLeft = maxNumber - proc next(): Future[?BlockExpiration] {.async.} = - if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): - dec numberLeft - let selectedBlock = self.testBlockExpirations[self.iteratorIndex] - inc self.iteratorIndex - return selectedBlock.some - iter.finish - return BlockExpiration.none + let + iter1 = AsyncIter[int].new(offset.. orig check (updated.freeSize - orig) == 200.u256 - check (repo.quotaReservedBytes - origQuota) == 200 + check (repo.quotaReservedBytes - origQuota) == 200.NBytes test "update releases quota when lowering size": let @@ -220,7 +257,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize - 100 check isOk await reservations.update(availability) - check (origQuota - repo.quotaReservedBytes) == 100 + check (origQuota - repo.quotaReservedBytes) == 100.NBytes test "update reserves quota when growing size": let @@ -229,7 +266,7 @@ asyncchecksuite "Reservations module": availability.totalSize = availability.totalSize + 100 check isOk await reservations.update(availability) - check (repo.quotaReservedBytes - origQuota) == 100 + check (repo.quotaReservedBytes - origQuota) == 100.NBytes test "reservation can be partially released": let availability = createAvailability() @@ -333,17 +370,17 @@ asyncchecksuite "Reservations module": check got.error of NotExistsError test "can get available bytes in repo": - check reservations.available == DefaultQuotaBytes + check reservations.available == DefaultQuotaBytes.uint test "reports quota available to be reserved": - check reservations.hasAvailable(DefaultQuotaBytes - 1) + check reservations.hasAvailable(DefaultQuotaBytes.uint - 1) test "reports quota not available to be reserved": - check not reservations.hasAvailable(DefaultQuotaBytes + 1) + check not reservations.hasAvailable(DefaultQuotaBytes.uint + 1) test "fails to create availability with size that is larger than available quota": let created = await reservations.createAvailability( - (DefaultQuotaBytes + 1).u256, + (DefaultQuotaBytes.uint + 1).u256, UInt256.example, UInt256.example, UInt256.example diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index c3352cfa..543a7133 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -2,7 +2,7 @@ import std/sequtils import std/sugar import std/times import pkg/chronos -import pkg/datastore +import pkg/datastore/typedds import pkg/questionable import pkg/questionable/results import pkg/codex/sales diff --git a/tests/codex/sales/teststates.nim b/tests/codex/sales/teststates.nim index ef562136..73810134 100644 --- a/tests/codex/sales/teststates.nim +++ b/tests/codex/sales/teststates.nim @@ -6,5 +6,9 @@ import ./states/testinitialproving import ./states/testfilled import ./states/testproving import ./states/testsimulatedproving +import ./states/testcancelled +import ./states/testerrored +import ./states/testignored +import ./states/testpreparing {.warning[UnusedImport]: off.} diff --git a/tests/codex/stores/repostore/testcoders.nim b/tests/codex/stores/repostore/testcoders.nim new file mode 100644 index 00000000..47cf4097 --- /dev/null +++ b/tests/codex/stores/repostore/testcoders.nim @@ -0,0 +1,71 @@ +import std/unittest +import std/random + +import pkg/stew/objects +import pkg/questionable +import pkg/questionable/results + +import pkg/codex/clock +import pkg/codex/stores/repostore/types +import pkg/codex/stores/repostore/coders + +import ../../helpers + +checksuite "Test coders": + + proc rand(T: type NBytes): T = + rand(Natural).NBytes + + proc rand(E: type[enum]): E = + let ordinals = enumRangeInt64(E) + E(ordinals[rand(ordinals.len - 1)]) + + proc rand(T: type QuotaUsage): T = + QuotaUsage( + used: rand(NBytes), + reserved: rand(NBytes) + ) + + proc rand(T: type BlockMetadata): T = + BlockMetadata( + expiry: rand(SecondsSince1970), + size: rand(NBytes), + refCount: rand(Natural) + ) + + proc rand(T: type DeleteResult): T = + DeleteResult( + kind: rand(DeleteResultKind), + released: rand(NBytes) + ) + + proc rand(T: type StoreResult): T = + StoreResult( + kind: rand(StoreResultKind), + used: rand(NBytes) + ) + + test "Natural encode/decode": + for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]: + check: + success(val) == Natural.decode(encode(val)) + + test "QuotaUsage encode/decode": + for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)): + check: + success(val) == QuotaUsage.decode(encode(val)) + + test "BlockMetadata encode/decode": + for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)): + check: + success(val) == BlockMetadata.decode(encode(val)) + + test "DeleteResult encode/decode": + for val in newSeqWith[DeleteResult](100, rand(DeleteResult)): + check: + success(val) == DeleteResult.decode(encode(val)) + + test "StoreResult encode/decode": + for val in newSeqWith[StoreResult](100, rand(StoreResult)): + check: + success(val) == StoreResult.decode(encode(val)) diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim index c050f623..bdf48c12 100644 --- a/tests/codex/stores/testmaintenance.nim +++ b/tests/codex/stores/testmaintenance.nim @@ -34,10 +34,10 @@ checksuite "BlockMaintainer": var testBe2: BlockExpiration var testBe3: BlockExpiration - proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = + proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration = BlockExpiration( cid: bt.Block.example.cid, - expiration: expiration + expiry: expiry ) setup: @@ -186,4 +186,3 @@ checksuite "BlockMaintainer": await invokeTimerManyTimes() # Second new block has expired check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] - diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index e58eeb29..ecb3b75e 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -23,6 +23,8 @@ import ../helpers/mockclock import ../examples import ./commonstoretests +import ./repostore/testcoders + checksuite "Test RepoStore start/stop": var @@ -34,24 +36,24 @@ checksuite "Test RepoStore start/stop": metaDs = SQLiteDatastore.new(Memory).tryGet() test "Should set started flag once started": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() check repo.started test "Should set started flag to false once stopped": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.stop() check not repo.started test "Should allow start to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.start() await repo.start() check repo.started test "Should allow stop to be called multiple times": - let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb) await repo.stop() await repo.stop() check not repo.started @@ -73,7 +75,7 @@ asyncchecksuite "RepoStore": mockClock = MockClock.new() mockClock.set(now) - repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200) + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb) teardown: (await repoDs.close()).tryGet @@ -85,117 +87,107 @@ asyncchecksuite "RepoStore": test "Should update current used bytes on block put": let blk = createTestBlock(200) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet check: - repo.quotaUsedBytes == 200 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u + repo.quotaUsedBytes == 200'nb test "Should update current used bytes on block delete": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb (await repo.delBlock(blk.cid)).tryGet check: - repo.quotaUsedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u + repo.quotaUsedBytes == 0'nb test "Should not update current used bytes if block exist": let blk = createTestBlock(100) - check repo.quotaUsedBytes == 0 + check repo.quotaUsedBytes == 0'nb (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 + check repo.quotaUsedBytes == 100'nb # put again (await repo.putBlock(blk)).tryGet - check repo.quotaUsedBytes == 100 - - check: - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + check repo.quotaUsedBytes == 100'nb test "Should fail storing passed the quota": let blk = createTestBlock(300) - check repo.totalUsed == 0 - expect QuotaUsedError: + check repo.totalUsed == 0'nb + expect QuotaNotEnoughError: (await repo.putBlock(blk)).tryGet test "Should reserve bytes": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb - (await repo.reserve(100)).tryGet + (await repo.reserve(100'nb)).tryGet check: - repo.totalUsed == 200 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 200'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 100'nb test "Should not reserve bytes over max quota": let blk = createTestBlock(100) - check repo.totalUsed == 0 + check repo.totalUsed == 0'nb (await repo.putBlock(blk)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 100'nb expect QuotaNotEnoughError: - (await repo.reserve(101)).tryGet + (await repo.reserve(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 100 - repo.quotaReservedBytes == 0 - - expect DatastoreKeyNotFound: - discard (await metaDs.get(QuotaReservedKey)).tryGet + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 100'nb + repo.quotaReservedBytes == 0'nb test "Should release bytes": discard createTestBlock(100) - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - (await repo.release(100)).tryGet + (await repo.release(100'nb)).tryGet check: - repo.totalUsed == 0 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 0 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u + repo.totalUsed == 0'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 0'nb test "Should not release bytes less than quota": - check repo.totalUsed == 0 - (await repo.reserve(100)).tryGet - check repo.totalUsed == 100 + check repo.totalUsed == 0'nb + (await repo.reserve(100'nb)).tryGet + check repo.totalUsed == 100'nb - expect CatchableError: - (await repo.release(101)).tryGet + expect RangeDefect: + (await repo.release(101'nb)).tryGet check: - repo.totalUsed == 100 - repo.quotaUsedBytes == 0 - repo.quotaReservedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + repo.totalUsed == 100'nb + repo.quotaUsedBytes == 0'nb + repo.quotaReservedBytes == 100'nb - proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = - let - query = Query.init(key) - responseIter = (await metaDs.query(query)).tryGet - response = (await allFinished(toSeq(responseIter))) - .mapIt(it.read.tryGet) - .filterIt(it.key.isSome) - return response + proc getExpirations(): Future[seq[BlockExpiration]] {.async.} = + let iter = (await repo.getBlockExpirations(100, 0)).tryGet() + + var res = newSeq[BlockExpiration]() + for fut in iter: + let be = await fut + res.add(be) + + res test "Should store block expiration timestamp": let @@ -203,49 +195,40 @@ asyncchecksuite "RepoStore": blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + 10 - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, duration.some)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should store block with default expiration timestamp when not provided": let blk = createTestBlock(100) let - expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds - expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds) (await repo.putBlock(blk)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations test "Should refuse update expiry with negative timestamp": let blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations expect ValueError: (await repo.ensureExpiry(blk.cid, -1)).tryGet @@ -262,56 +245,45 @@ asyncchecksuite "RepoStore": test "Should update block expiration timestamp when new expiration is farther": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20) - (await repo.putBlock(blk, some duration.seconds)).tryGet + (await repo.putBlock(blk, some 10.seconds)).tryGet - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 20)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == updatedExpectedExpiration.toBytes + expectedExpiration notin updatedExpirations + updatedExpectedExpiration in updatedExpirations test "Should not update block expiration timestamp when current expiration is farther then new one": let - duration = 10 blk = createTestBlock(100) - expectedExpiration: SecondsSince1970 = now + duration - updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 - expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10) + updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5) + (await repo.putBlock(blk, some 10.seconds)).tryGet - (await repo.putBlock(blk, some duration.seconds)).tryGet - - var response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in expirations - (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + (await repo.ensureExpiry(blk.cid, now + 5)).tryGet - response = await queryMetaDs(expectedKey) + let updatedExpirations = await getExpirations() check: - response.len == 1 - !response[0].key == expectedKey - response[0].data == expectedExpiration.toBytes + expectedExpiration in updatedExpirations + updatedExpectedExpiration notin updatedExpirations test "delBlock should remove expiration metadata": let @@ -321,19 +293,19 @@ asyncchecksuite "RepoStore": (await repo.putBlock(blk, 10.seconds.some)).tryGet (await repo.delBlock(blk.cid)).tryGet - let response = await queryMetaDs(expectedKey) + let expirations = await getExpirations() check: - response.len == 0 + expirations.len == 0 test "Should retrieve block expiration information": - proc unpack(beIter: Future[?!AsyncIter[?BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = + proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: return expirations - for be in toSeq(iter): - if value =? (await be): - expirations.add(value) + for beFut in toSeq(iter): + let value = await beFut + expirations.add(value) return expirations let @@ -343,12 +315,12 @@ asyncchecksuite "RepoStore": blk3 = createTestBlock(12) let - expectedExpiration: SecondsSince1970 = 123 + 10 + expectedExpiration: SecondsSince1970 = now + 10 proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = check: be.cid == expectedBlock.cid - be.expiration == expectedExpiration + be.expiry == expectedExpiration (await repo.putBlock(blk1, duration.some)).tryGet diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 8ca428e7..7a30259f 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -58,6 +58,14 @@ suite "Indexing strategies": expect IndexingWrongIterationsError: discard LinearStrategy.init(0, 10, 0) + test "should split elements evenly when possible": + let + l = LinearStrategy.init(0, 11, 3) + check: + toSeq(l.getIndicies(0)) == @[0, 1, 2, 3].mapIt(it) + toSeq(l.getIndicies(1)) == @[4, 5, 6, 7].mapIt(it) + toSeq(l.getIndicies(2)) == @[8, 9, 10, 11].mapIt(it) + test "linear - oob": expect IndexingError: discard linear.getIndicies(3) diff --git a/tests/codex/testmanifest.nim b/tests/codex/testmanifest.nim index 188cc957..3393fa08 100644 --- a/tests/codex/testmanifest.nim +++ b/tests/codex/testmanifest.nim @@ -74,3 +74,36 @@ checksuite "Manifest": test "Should encode/decode to/from verifiable manifest": check: encodeDecode(verifiableManifest) == verifiableManifest + + +suite "Manifest - Attribute Inheritance": + proc makeProtectedManifest(strategy: StrategyType): Manifest = + Manifest.new( + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 1.MiBs, + datasetSize = 100.MiBs, + ), + treeCid = Cid.example, + datasetSize = 200.MiBs, + ecK = 1, + ecM = 1, + strategy = strategy + ) + + test "Should preserve interleaving strategy for protected manifest in verifiable manifest": + var verifiable = Manifest.new( + manifest = makeProtectedManifest(SteppedStrategy), + verifyRoot = Cid.example, + slotRoots = @[Cid.example, Cid.example] + ).tryGet() + + check verifiable.protectedStrategy == SteppedStrategy + + verifiable = Manifest.new( + manifest = makeProtectedManifest(LinearStrategy), + verifyRoot = Cid.example, + slotRoots = @[Cid.example, Cid.example] + ).tryGet() + + check verifiable.protectedStrategy == LinearStrategy diff --git a/tests/codex/teststorestream.nim b/tests/codex/teststorestream.nim index eaf92c1d..b717a8ec 100644 --- a/tests/codex/teststorestream.nim +++ b/tests/codex/teststorestream.nim @@ -1,12 +1,15 @@ import pkg/chronos import pkg/questionable/results -import pkg/codex/streams -import pkg/codex/stores -import pkg/codex/manifest -import pkg/codex/blocktype as bt +import pkg/codex/[ + streams, + stores, + indexingstrategy, + manifest, + blocktype as bt] import ../asynctest +import ./examples import ./helpers asyncchecksuite "StoreStream": @@ -99,3 +102,40 @@ asyncchecksuite "StoreStream": await stream.readExactly(addr buf[0], 15) check sequentialBytes(buf,15,0) + +suite "StoreStream - Size Tests": + + var stream: StoreStream + + teardown: + await stream.close() + + test "Should return dataset size as stream size": + let manifest = Manifest.new( + treeCid = Cid.example, + datasetSize = 80.NBytes, + blockSize = 10.NBytes + ) + + stream = StoreStream.new(CacheStore.new(), manifest) + + check stream.size == 80 + + test "Should not count parity/padding bytes as part of stream size": + let protectedManifest = Manifest.new( + treeCid = Cid.example, + datasetSize = 120.NBytes, # size including parity bytes + blockSize = 10.NBytes, + version = CIDv1, + hcodec = Sha256HashCodec, + codec = BlockCodec, + ecK = 2, + ecM = 1, + originalTreeCid = Cid.example, + originalDatasetSize = 80.NBytes, # size without parity bytes + strategy = StrategyType.SteppedStrategy + ) + + stream = StoreStream.new(CacheStore.new(), protectedManifest) + + check stream.size == 80 diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 74363f60..8c2c20e4 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,5 +1,6 @@ import std/sequtils from pkg/libp2p import `==` +import pkg/codex/units import ./twonodes twonodessuite "REST API", debug1 = false, debug2 = false: @@ -20,10 +21,10 @@ twonodessuite "REST API", debug1 = false, debug2 = false: discard client1.postAvailability(totalSize=12.u256, duration=2.u256, minPrice=3.u256, maxCollateral=4.u256).get let space = client1.space().tryGet() check: - space.totalBlocks == 2.uint - space.quotaMaxBytes == 8589934592.uint - space.quotaUsedBytes == 65592.uint - space.quotaReservedBytes == 12.uint + space.totalBlocks == 2 + space.quotaMaxBytes == 8589934592.NBytes + space.quotaUsedBytes == 65592.NBytes + space.quotaReservedBytes == 12.NBytes test "node lists local files": let content1 = "some file contents" diff --git a/vendor/atlas.workspace b/vendor/atlas.workspace deleted file mode 100644 index 812bfb2d..00000000 --- a/vendor/atlas.workspace +++ /dev/null @@ -1,3 +0,0 @@ -deps="" -resolver="MaxVer" -overrides="urls.rules"