Merge branch 'master' into feature/ceremony-files

This commit is contained in:
Ben 2024-06-18 10:41:32 +02:00
commit 118707fdec
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
25 changed files with 748 additions and 868 deletions

View File

@ -32,6 +32,7 @@ jobs:
uses: actions/checkout@v4
with:
submodules: recursive
ref: ${{ github.event.pull_request.head.sha }}
- name: Setup Nimbus Build System
uses: ./.github/actions/nimbus-build-system

View File

@ -9,7 +9,7 @@ on:
env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: v1.6.14
nim_version: pinned
concurrency:
@ -48,6 +48,7 @@ jobs:
uses: actions/checkout@v4
with:
submodules: recursive
ref: ${{ github.event.pull_request.head.sha }}
- name: Setup Nimbus Build System
uses: ./.github/actions/nimbus-build-system

View File

@ -6,7 +6,7 @@ on:
env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: v1.6.14, v1.6.16, v1.6.18
nim_version: pinned, v1.6.16, v1.6.18
jobs:
matrix:

View File

@ -5,6 +5,30 @@
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
# This is the Nim version used locally and in regular CI builds.
# Can be a specific version tag, a branch name, or a commit hash.
# Can be overridden by setting the NIM_COMMIT environment variable
# before calling make.
#
# For readability in CI, if NIM_COMMIT is set to "pinned",
# this will also default to the version pinned here.
#
# If NIM_COMMIT is set to "nimbusbuild", this will use the
# version pinned by nimbus-build-system.
PINNED_NIM_VERSION := v1.6.14
ifeq ($(NIM_COMMIT),)
NIM_COMMIT := $(PINNED_NIM_VERSION)
else ifeq ($(NIM_COMMIT),pinned)
NIM_COMMIT := $(PINNED_NIM_VERSION)
endif
ifeq ($(NIM_COMMIT),nimbusbuild)
undefine NIM_COMMIT
else
export NIM_COMMIT
endif
SHELL := bash # the shell used internally by Make
# used inside the included makefiles

View File

@ -1,209 +0,0 @@
{
"clangVersion": "",
"gccVersion": "",
"hostCPU": "arm64",
"hostOS": "macosx",
"items": {
"asynctest": {
"commit": "8e2f4e73b97123be0f0041c129942b32df23ecb1",
"dir": "vendor/asynctest",
"url": "https://github.com/codex-storage/asynctest"
},
"dnsclient.nim": {
"commit": "23214235d4784d24aceed99bbfe153379ea557c8",
"dir": "vendor/dnsclient.nim",
"url": "https://github.com/ba0f3/dnsclient.nim"
},
"lrucache.nim": {
"commit": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd",
"dir": "vendor/lrucache.nim",
"url": "https://github.com/status-im/lrucache.nim"
},
"nim-bearssl": {
"commit": "99fcb3405c55b27cfffbf60f5368c55da7346f23",
"dir": "vendor/nim-bearssl",
"url": "https://github.com/status-im/nim-bearssl"
},
"nim-blscurve": {
"commit": "48d8668c5a9a350d3a7ee0c3713ef9a11980a40d",
"dir": "vendor/nim-blscurve",
"url": "https://github.com/status-im/nim-blscurve"
},
"nim-chronicles": {
"commit": "c9c8e58ec3f89b655a046c485f622f9021c68b61",
"dir": "vendor/nim-chronicles",
"url": "https://github.com/status-im/nim-chronicles"
},
"nim-chronos": {
"commit": "0277b65be2c7a365ac13df002fba6e172be55537",
"dir": "vendor/nim-chronos",
"url": "https://github.com/status-im/nim-chronos"
},
"nim-confutils": {
"commit": "2028b41602b3abf7c9bf450744efde7b296707a2",
"dir": "vendor/nim-confutils",
"url": "https://github.com/status-im/nim-confutils"
},
"nim-contract-abi": {
"commit": "61f8f59b3917d8e27c6eb4330a6d8cf428e98b2d",
"dir": "vendor/nim-contract-abi",
"url": "https://github.com/status-im/nim-contract-abi"
},
"nim-datastore": {
"commit": "0cde8aeb67c59fd0ac95496dc6b5e1168d6632aa",
"dir": "vendor/nim-datastore",
"url": "https://github.com/codex-storage/nim-datastore"
},
"nim-faststreams": {
"commit": "720fc5e5c8e428d9d0af618e1e27c44b42350309",
"dir": "vendor/nim-faststreams",
"url": "https://github.com/status-im/nim-faststreams"
},
"nim-http-utils": {
"commit": "3b491a40c60aad9e8d3407443f46f62511e63b18",
"dir": "vendor/nim-http-utils",
"url": "https://github.com/status-im/nim-http-utils"
},
"nim-json-rpc": {
"commit": "0bf2bcbe74a18a3c7a709d57108bb7b51e748a92",
"dir": "vendor/nim-json-rpc",
"url": "https://github.com/status-im/nim-json-rpc"
},
"nim-json-serialization": {
"commit": "bb53d49caf2a6c6cf1df365ba84af93cdcfa7aa3",
"dir": "vendor/nim-json-serialization",
"url": "https://github.com/status-im/nim-json-serialization"
},
"nim-leopard": {
"commit": "1a6f2ab7252426a6ac01482a68b75d0c3b134cf0",
"dir": "vendor/nim-leopard",
"url": "https://github.com/status-im/nim-leopard"
},
"nim-libbacktrace": {
"commit": "b29c22ba0ef13de50b779c776830dbea1d50cd33",
"dir": "vendor/nim-libbacktrace",
"url": "https://github.com/status-im/nim-libbacktrace"
},
"nim-libp2p": {
"commit": "440461b24b9e66542b34d26a0b908c17f6549d05",
"dir": "vendor/nim-libp2p",
"url": "https://github.com/status-im/nim-libp2p"
},
"nim-libp2p-dht": {
"commit": "fdd02450aa6979add7dabd29a3ba0f8738bf89f8",
"dir": "vendor/nim-libp2p-dht",
"url": "https://github.com/status-im/nim-libp2p-dht"
},
"nim-metrics": {
"commit": "6142e433fc8ea9b73379770a788017ac528d46ff",
"dir": "vendor/nim-metrics",
"url": "https://github.com/status-im/nim-metrics"
},
"nim-nat-traversal": {
"commit": "27d314d65c9078924b3239fe4e2f5af0c512b28c",
"dir": "vendor/nim-nat-traversal",
"url": "https://github.com/status-im/nim-nat-traversal"
},
"nim-nitro": {
"commit": "6b4c455bf4dad7449c1580055733a1738fcd5aec",
"dir": "vendor/nim-nitro",
"url": "https://github.com/status-im/nim-nitro"
},
"nim-presto": {
"commit": "3984431dc0fc829eb668e12e57e90542b041d298",
"dir": "vendor/nim-presto",
"url": "https://github.com/status-im/nim-presto"
},
"nim-protobuf-serialization": {
"commit": "28214b3e40c755a9886d2ec8f261ec48fbb6bec6",
"dir": "vendor/nim-protobuf-serialization",
"url": "https://github.com/status-im/nim-protobuf-serialization"
},
"nim-results": {
"commit": "f3c666a272c69d70cb41e7245e7f6844797303ad",
"dir": "vendor/nim-results",
"url": "https://github.com/arnetheduck/nim-results"
},
"nim-secp256k1": {
"commit": "2acbbdcc0e63002a013fff49f015708522875832",
"dir": "vendor/nim-secp256k1",
"url": "https://github.com/status-im/nim-secp256k1"
},
"nim-serialization": {
"commit": "384eb2561ee755446cff512a8e057325848b86a7",
"dir": "vendor/nim-serialization",
"url": "https://github.com/status-im/nim-serialization"
},
"nim-sqlite3-abi": {
"commit": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3",
"dir": "vendor/nim-sqlite3-abi",
"url": "https://github.com/arnetheduck/nim-sqlite3-abi"
},
"nim-stew": {
"commit": "7afe7e3c070758cac1f628e4330109f3ef6fc853",
"dir": "vendor/nim-stew",
"url": "https://github.com/status-im/nim-stew"
},
"nim-taskpools": {
"commit": "b3673c7a7a959ccacb393bd9b47e997bbd177f5a",
"dir": "vendor/nim-taskpools",
"url": "https://github.com/status-im/nim-taskpools"
},
"nim-testutils": {
"commit": "b56a5953e37fc5117bd6ea6dfa18418c5e112815",
"dir": "vendor/nim-testutils",
"url": "https://github.com/status-im/nim-testutils"
},
"nim-toml-serialization": {
"commit": "86d477136f105f04bfd0dd7c0e939593d81fc581",
"dir": "vendor/nim-toml-serialization",
"url": "https://github.com/status-im/nim-toml-serialization"
},
"nim-unittest2": {
"commit": "b178f47527074964f76c395ad0dfc81cf118f379",
"dir": "vendor/nim-unittest2",
"url": "https://github.com/status-im/nim-unittest2"
},
"nim-websock": {
"commit": "2c3ae3137f3c9cb48134285bd4a47186fa51f0e8",
"dir": "vendor/nim-websock",
"url": "https://github.com/status-im/nim-websock"
},
"nim-zlib": {
"commit": "f34ca261efd90f118dc1647beefd2f7a69b05d93",
"dir": "vendor/nim-zlib",
"url": "https://github.com/status-im/nim-zlib"
},
"nim-stint": {
"dir": "vendor/stint",
"url": "https://github.com/status-im/nim-stint",
"commit": "86621eced1dcfb5e25903019ebcfc76ed9128ec5"
},
"nimcrypto": {
"commit": "24e006df85927f64916e60511620583b11403178",
"dir": "vendor/nimcrypto",
"url": "https://github.com/status-im/nimcrypto"
},
"npeg": {
"commit": "b15a10e388b91b898c581dbbcb6a718d46b27d2f",
"dir": "vendor/npeg",
"url": "https://github.com/zevv/npeg"
},
"questionable": {
"commit": "47692e0d923ada8f7f731275b2a87614c0150987",
"dir": "vendor/questionable",
"url": "https://github.com/codex-storage/questionable"
},
"upraises": {
"commit": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2",
"dir": "vendor/upraises",
"url": "https://github.com/markspanbroek/upraises"
}
},
"nimVersion": "1.6.14",
"nimbleFile": {
"content": "# Package\n\nversion = \"0.3.2\"\nauthor = \"Status Research & Development GmbH\"\ndescription = \"DHT based on the libp2p Kademlia spec\"\nlicense = \"MIT\"\nskipDirs = @[\"tests\"]\n\n\n# Dependencies\nrequires \"nim >= 1.2.0\"\nrequires \"secp256k1#2acbbdcc0e63002a013fff49f015708522875832\" # >= 0.5.2 & < 0.6.0\nrequires \"protobuf_serialization\" # >= 0.2.0 & < 0.3.0\nrequires \"nimcrypto == 0.5.4\"\nrequires \"bearssl#head\"\nrequires \"chronicles >= 0.10.2 & < 0.11.0\"\nrequires \"chronos == 3.2.0\" # >= 3.0.11 & < 3.1.0\nrequires \"libp2p#unstable\"\nrequires \"metrics\"\nrequires \"stew#head\"\nrequires \"stint\"\nrequires \"asynctest >= 0.3.1 & < 0.4.0\"\nrequires \"https://github.com/codex-storage/nim-datastore#head\"\nrequires \"questionable\"\n\ninclude \"build.nims\"\n\n",
"filename": ""
},
"nimcfg": "############# begin Atlas config section ##########\n--noNimblePath\n--path:\"vendor/nim-secp256k1\"\n--path:\"vendor/nim-protobuf-serialization\"\n--path:\"vendor/nimcrypto\"\n--path:\"vendor/nim-bearssl\"\n--path:\"vendor/nim-chronicles\"\n--path:\"vendor/nim-chronos\"\n--path:\"vendor/nim-libp2p\"\n--path:\"vendor/nim-metrics\"\n--path:\"vendor/nim-stew\"\n--path:\"vendor/nim-stint\"\n--path:\"vendor/asynctest\"\n--path:\"vendor/nim-datastore\"\n--path:\"vendor/questionable\"\n--path:\"vendor/nim-faststreams\"\n--path:\"vendor/nim-serialization\"\n--path:\"vendor/npeg/src\"\n--path:\"vendor/nim-unittest2\"\n--path:\"vendor/nim-testutils\"\n--path:\"vendor/nim-json-serialization\"\n--path:\"vendor/nim-http-utils\"\n--path:\"vendor/dnsclient.nim/src\"\n--path:\"vendor/nim-websock\"\n--path:\"vendor/nim-results\"\n--path:\"vendor/nim-sqlite3-abi\"\n--path:\"vendor/upraises\"\n--path:\"vendor/nim-zlib\"\n############# end Atlas config section ##########\n"
}

View File

@ -6,31 +6,4 @@ binDir = "build"
srcDir = "."
installFiles = @["build.nims"]
requires "nim >= 1.2.0"
requires "asynctest >= 0.5.1 & < 0.6.0"
requires "bearssl >= 0.1.4"
requires "chronicles >= 0.7.2"
requires "chronos >= 2.5.2"
requires "confutils"
requires "ethers >= 0.7.3 & < 0.8.0"
requires "libbacktrace"
requires "libp2p"
requires "metrics"
requires "nimcrypto >= 0.4.1"
requires "nitro >= 0.5.1 & < 0.6.0"
requires "presto"
requires "protobuf_serialization >= 0.2.0 & < 0.3.0"
requires "questionable >= 0.10.13 & < 0.11.0"
requires "secp256k1"
requires "serde >= 1.0.0 & < 2.0.0"
requires "stew"
requires "upraises >= 0.1.0 & < 0.2.0"
requires "toml_serialization"
requires "https://github.com/status-im/lrucache.nim#1.2.2"
requires "leopard >= 0.1.0 & < 0.2.0"
requires "blscurve"
requires "libp2pdht"
requires "eth"
requires "https://github.com/codex-storage/nim-poseidon2.git >= 0.1.0 & < 0.2.0"
include "build.nims"

View File

@ -297,7 +297,7 @@ proc new*(
taskpool = taskpool)
restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore),
codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
initTAddress(config.apiBindAddress , config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high)

View File

@ -198,6 +198,12 @@ type
name: "api-port"
abbr: "p" }: Port
apiCorsAllowedOrigin* {.
desc: "The REST Api CORS allowed origin for downloading data. '*' will allow all origins, '' will allow none.",
defaultValue: string.none
defaultValueDesc: "Disallow all cross origin requests to download data"
name: "api-cors-origin" }: Option[string]
repoKind* {.
desc: "Backend for main repo store (fs, sqlite, leveldb)"
defaultValueDesc: "fs"

View File

@ -120,7 +120,7 @@ proc getPendingBlocks(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
Iter.new(genNext, isFinished)
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
proc prepareEncodingData(
self: Erasure,
@ -440,8 +440,7 @@ proc decode*(
if treeCid != encoded.originalTreeCid:
return failure("Original tree root differs from the tree root computed out of recovered data")
let idxIter = Iter
.fromItems(recoveredIndices)
let idxIter = Iter[Natural].new(recoveredIndices)
.filter((i: Natural) => i < tree.leavesCount)
if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption:

View File

@ -39,22 +39,9 @@ func checkIteration(self: IndexingStrategy, iteration: int): void {.raises: [Ind
IndexingError,
"Indexing iteration can't be greater than or equal to iterations.")
proc getIter(first, last, step: int): Iter[int] =
var
finish = false
cur = first
func get(): int =
result = cur
cur += step
if cur > last:
finish = true
func isFinished(): bool =
finish
Iter.new(get, isFinished)
func getIter(first, last, step: int): Iter[int] =
{.cast(noSideEffect).}:
Iter[int].new(first, last, step)
func getLinearIndicies(
self: IndexingStrategy,

View File

@ -157,10 +157,8 @@ proc updateExpiry*(
try:
let
ensuringFutures = Iter
.fromSlice(0..<manifest.blocksCount)
.mapIt(
self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
ensuringFutures = Iter[int].new(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
@ -209,7 +207,7 @@ proc fetchBatched*(
trace "Fetching blocks in batches of", size = batchSize
let iter = Iter.fromSlice(0..<manifest.blocksCount)
let iter = Iter[int].new(0..<manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)
proc streamSingleBlock(

View File

@ -107,6 +107,8 @@ proc retrieveCid(
await stream.close()
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
router.rawApi(
MethodPost,
"/api/codex/v1/data") do (
@ -166,6 +168,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
Http400,
$cid.error())
if corsOrigin =? allowedOrigin:
resp.setHeader("Access-Control-Allow-Origin", corsOrigin)
resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS")
resp.setHeader("Access-Control-Headers", "X-Requested-With")
resp.setHeader("Access-Control-Max-Age", "86400")
await node.retrieveCid(cid.get(), local = true, resp=resp)
router.api(
@ -181,6 +189,12 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
Http400,
$cid.error())
if corsOrigin =? allowedOrigin:
resp.setHeader("Access-Control-Allow-Origin", corsOrigin)
resp.setHeader("Access-Control-Allow-Methods", "GET, OPTIONS")
resp.setHeader("Access-Control-Headers", "X-Requested-With")
resp.setHeader("Access-Control-Max-Age", "86400")
await node.retrieveCid(cid.get(), local = false, resp=resp)
router.api(
@ -636,8 +650,13 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500)
proc initRestApi*(node: CodexNodeRef, conf: CodexConf, repoStore: RepoStore): RestRouter =
var router = RestRouter.init(validate)
proc initRestApi*(
node: CodexNodeRef,
conf: CodexConf,
repoStore: RepoStore,
corsAllowedOrigin: ?string): RestRouter =
var router = RestRouter.init(validate, corsAllowedOrigin)
initDataApi(node, repoStore, router)
initSalesApi(node, router)

View File

@ -132,51 +132,35 @@ method listBlocks*(
## Get the list of blocks in the BlockStore. This is an intensive operation
##
var
iter = AsyncIter[?Cid]()
let
cids = self.cids()
proc next(): Future[?Cid] {.async.} =
await idleAsync()
proc isFinished(): bool =
return finished(cids)
var cid: Cid
while true:
if iter.finished:
return Cid.none
proc genNext(): Future[Cid] {.async.} =
cids()
cid = cids()
let iter = await (AsyncIter[Cid].new(genNext, isFinished)
.filter(
proc (cid: Cid): Future[bool] {.async.} =
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return false
if finished(cids):
iter.finish
return Cid.none
case blockType:
of BlockType.Both:
return true
of BlockType.Manifest:
return isManifest
of BlockType.Block:
return not isManifest
))
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return Cid.none
case blockType:
of BlockType.Manifest:
if not isManifest:
trace "Cid is not manifest, skipping", cid
continue
break
of BlockType.Block:
if isManifest:
trace "Cid is a manifest, skipping", cid
continue
break
of BlockType.Both:
break
return cid.some
iter.next = next
return success iter
return success(map[Cid, ?Cid](iter,
proc (cid: Cid): Future[?Cid] {.async.} =
some(cid)
))
func putBlockSync(self: CacheStore, blk: Block): bool =

View File

@ -0,0 +1,66 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/chronicles
import pkg/datastore/typedds
import ../utils/asynciter
type KeyVal[T] = tuple[key: Key, value: T]
proc toAsyncIter*[T](
queryIter: QueryIter[T],
finishOnErr: bool = true
): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} =
## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically
## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only
## if the flag finishOnErr is set to true)
##
if queryIter.finished:
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return success(AsyncIter[?!QueryResponse[T]].empty())
var errOccurred = false
proc genNext: Future[?!QueryResponse[T]] {.async.} =
let queryResOrErr = await queryIter.next()
if queryResOrErr.isErr:
errOccurred = true
if queryIter.finished or (errOccurred and finishOnErr):
trace "Disposing iterator"
if error =? (await queryIter.dispose()).errorOption:
return failure(error)
return queryResOrErr
proc isFinished(): bool =
queryIter.finished or (errOccurred and finishOnErr)
AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success
proc filterSuccess*[T](
iter: AsyncIter[?!QueryResponse[T]]
): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} =
## Filters out any items that are not success
proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} =
without res =? resOrErr, error:
error "Error occurred when getting QueryResponse", msg = error.msg
return KeyVal[T].none
without key =? res.key:
warn "No key for a QueryResponse"
return KeyVal[T].none
without value =? res.value, error:
error "Error occurred when getting a value from QueryResponse", msg = error.msg
return KeyVal[T].none
(key: key, value: value).some
await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping)

View File

@ -47,4 +47,4 @@ proc putSomeProofs*(store: BlockStore, tree: CodexTree, iter: Iter[Natural]): Fu
store.putSomeProofs(tree, iter.map((i: Natural) => i.ord))
proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] =
store.putSomeProofs(tree, Iter.fromSlice(0..<tree.leavesCount))
store.putSomeProofs(tree, Iter[int].new(0..<tree.leavesCount))

View File

@ -3,26 +3,29 @@ import std/sugar
import pkg/questionable
import pkg/chronos
type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, noSideEffect.}
IsFinished* = proc(): bool {.raises: [], gcsafe, noSideEffect.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
AsyncIter*[T] = Iter[Future[T]]
import ./iter
proc finish*[T](self: Iter[T]): void =
export iter
## AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing
##
type
AsyncIter*[T] = ref object
finished: bool
next*: GenNext[Future[T]]
proc finish*[T](self: AsyncIter[T]): void =
self.finished = true
proc finished*[T](self: Iter[T]): bool =
proc finished*[T](self: AsyncIter[T]): bool =
self.finished
iterator items*[T](self: Iter[T]): T =
iterator items*[T](self: AsyncIter[T]): Future[T] =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
@ -32,14 +35,25 @@ proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
proc flatMap*[T, U](fut: Future[T], fn: Function[T, Future[U]]): Future[U] {.async.} =
let t = await fut
await fn(t)
proc next(): T {.raises: [CatchableError].} =
proc new*[T](_: type AsyncIter[T], genNext: GenNext[Future[T]], isFinished: IsFinished, finishOnErr: bool = true): AsyncIter[T] =
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
var iter = AsyncIter[T]()
proc next(): Future[T] {.async.} =
if not iter.finished:
var item: T
try:
item = genNext()
item = await genNext()
except CancelledError as err:
iter.finish
raise err
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
@ -49,7 +63,7 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
iter.finish
return item
else:
raise newException(CatchableError, "Iterator is finished but next item was requested")
raise newException(CatchableError, "AsyncIter is finished but next item was requested")
if isFinished():
iter.finish
@ -57,90 +71,95 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
iter.next = next
return iter
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
## Create new iterator from items
proc mapAsync*[T, U](iter: Iter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
AsyncIter[U].new(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished()
)
proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] =
## Creates new Iter from a slice
##
Iter.fromSlice(0..<items.len)
.map((i: int) => items[i])
let iter = Iter[U].new(slice)
mapAsync[U, U](iter,
proc (i: U): Future[U] {.async.} =
i
)
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
## Creates new iterator from slice
proc new*[U, V, S: Ordinal](_: type AsyncIter[U], a: U, b: V, step: S = 1): AsyncIter[U] =
## Creates new Iter in range a..b with specified step (default 1)
##
Iter.fromRange(slice.a.int, slice.b.int, 1)
let iter = Iter[U].new(a, b, step)
mapAsync[U, U](iter,
proc (i: U): Future[U] {.async.} =
i
)
proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] =
## Creates new iterator in range a..b with specified step (default 1)
proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] =
## Creates an empty AsyncIter
##
var i = a
proc genNext(): Future[T] {.raises: [CatchableError].} =
raise newException(CatchableError, "Next item requested from an empty AsyncIter")
proc isFinished(): bool = true
proc genNext(): U =
let u = i
inc(i, step)
u
AsyncIter[T].new(genNext, isFinished)
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
Iter.new(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter.new(
genNext = () => fn(iter.next()),
proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] =
AsyncIter[U].new(
genNext = () => iter.next().flatMap(fn),
isFinished = () => iter.finished
)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
var nextT: Option[T]
proc mapFilter*[T, U](iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]]): Future[AsyncIter[U]] {.async.} =
var nextFutU: Option[Future[U]]
proc tryFetch(): void =
nextT = T.none
proc tryFetch(): Future[void] {.async.} =
nextFutU = Future[U].none
while not iter.finished:
let t = iter.next()
if predicate(t):
nextT = some(t)
let futT = iter.next()
try:
if u =? await futT.flatMap(mapPredicate):
let futU = newFuture[U]("AsyncIter.mapFilterAsync")
futU.complete(u)
nextFutU = some(futU)
break
except CancelledError as err:
raise err
except CatchableError as err:
let errFut = newFuture[U]("AsyncIter.mapFilterAsync")
errFut.fail(err)
nextFutU = some(errFut)
break
proc genNext(): T =
let t = nextT.unsafeGet
tryFetch()
return t
proc genNext(): Future[U] {.async.} =
let futU = nextFutU.unsafeGet
await tryFetch()
await futU
proc isFinished(): bool =
nextT.isNone
nextFutU.isNone
tryFetch()
Iter.new(genNext, isFinished)
await tryFetch()
AsyncIter[U].new(genNext, isFinished)
proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
var ringBuf = newSeq[T](n)
var iterLen = int.high
var i = 0
proc tryFetch(j: int): void =
if not iter.finished:
let item = iter.next()
ringBuf[j mod n] = item
if iter.finished:
iterLen = min(j + 1, iterLen)
proc filter*[T](iter: AsyncIter[T], predicate: Function[T, Future[bool]]): Future[AsyncIter[T]] {.async.} =
proc wrappedPredicate(t: T): Future[Option[T]] {.async.} =
if await predicate(t):
some(t)
else:
if j == 0:
iterLen = 0
T.none
proc genNext(): T =
let item = ringBuf[i mod n]
tryFetch(i + n)
inc i
return item
await mapFilter[T, T](iter, wrappedPredicate)
proc isFinished(): bool =
i >= iterLen
proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] =
## Delays emitting each item by given duration
##
# initialize ringBuf with n prefetched values
for j in 0..<n:
tryFetch(j)
Iter.new(genNext, isFinished)
map[T, T](iter,
proc (t: T): Future[T] {.async.} =
await sleepAsync(d)
t
)

140
codex/utils/iter.nim Normal file
View File

@ -0,0 +1,140 @@
import std/sugar
import pkg/questionable
import pkg/questionable/results
type
Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.raises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
proc finish*[T](self: Iter[T]): void =
self.finished = true
proc finished*[T](self: Iter[T]): bool =
self.finished
iterator items*[T](self: Iter[T]): T =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
inc(i)
proc new*[T](_: type Iter[T], genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
## Creates a new Iter using elements returned by supplier function `genNext`.
## Iter is finished whenever `isFinished` returns true.
##
var iter = Iter[T]()
proc next(): T {.raises: [CatchableError].} =
if not iter.finished:
var item: T
try:
item = genNext()
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
raise err
if isFinished():
iter.finish
return item
else:
raise newException(CatchableError, "Iter is finished but next item was requested")
if isFinished():
iter.finish
iter.next = next
return iter
proc new*[U, V, S: Ordinal](_: type Iter[U], a: U, b: V, step: S = 1): Iter[U] =
## Creates a new Iter in range a..b with specified step (default 1)
##
var i = a
proc genNext(): U =
let u = i
inc(i, step)
u
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
Iter[U].new(genNext, isFinished)
proc new*[U, V: Ordinal](_: type Iter[U], slice: HSlice[U, V]): Iter[U] =
## Creates a new Iter from a slice
##
Iter[U].new(slice.a.int, slice.b.int, 1)
proc new*[T](_: type Iter[T], items: seq[T]): Iter[T] =
## Creates a new Iter from a sequence
##
Iter[int].new(0..<items.len)
.map((i: int) => items[i])
proc empty*[T](_: type Iter[T]): Iter[T] =
## Creates an empty Iter
##
proc genNext(): T {.raises: [CatchableError].} =
raise newException(CatchableError, "Next item requested from an empty Iter")
proc isFinished(): bool = true
Iter[T].new(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter[U].new(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished
)
proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] =
var nextUOrErr: Option[Result[U, ref CatchableError]]
proc tryFetch(): void =
nextUOrErr = Result[U, ref CatchableError].none
while not iter.finished:
try:
let t = iter.next()
if u =? mapPredicate(t):
nextUOrErr = some(success(u))
break
except CatchableError as err:
nextUOrErr = some(U.failure(err))
proc genNext(): U {.raises: [CatchableError].} =
# at this point nextUOrErr should always be some(..)
without u =? nextUOrErr.unsafeGet, err:
raise err
tryFetch()
return u
proc isFinished(): bool =
nextUOrErr.isNone
tryFetch()
Iter[U].new(genNext, isFinished)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
proc wrappedPredicate(t: T): Option[T] =
if predicate(t):
some(t)
else:
T.none
mapFilter[T, T](iter, wrappedPredicate)

View File

@ -1,481 +0,0 @@
{
"version": 1,
"packages": {
"stew": {
"version": "0.1.0",
"vcsRevision": "6ad35b876fb6ebe0dfee0f697af173acc47906ee",
"url": "https://github.com/status-im/nim-stew.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "46d58c4feb457f3241e3347778334e325dce5268"
}
},
"unittest2": {
"version": "0.0.4",
"vcsRevision": "f180f596c88dfd266f746ed6f8dbebce39c824db",
"url": "https://github.com/status-im/nim-unittest2.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "fa309c41eaf6ef57895b9e603f2620a2f6e11780"
}
},
"httputils": {
"version": "0.3.0",
"vcsRevision": "689da19e9e9cfff4ced85e2b25c6b2b5598ed079",
"url": "https://github.com/status-im/nim-http-utils.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "4ad3ad68d13c50184180ab4b2eacc0bd7ed2ed44"
}
},
"nimcrypto": {
"version": "0.5.4",
"vcsRevision": "a5742a9a214ac33f91615f3862c7b099aec43b00",
"url": "https://github.com/cheatfate/nimcrypto.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "f76c87707cd4e96355b8bb6ef27e7f8b0aac1e08"
}
},
"questionable": {
"version": "0.10.2",
"vcsRevision": "6018fd43e033d5a5310faa45bcaa1b44049469a4",
"url": "https://github.com/status-im/questionable.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "36a6c012637c7736a390e74a7f94667bca562073"
}
},
"upraises": {
"version": "0.1.0",
"vcsRevision": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2",
"url": "https://github.com/markspanbroek/upraises.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a0243c8039e12d547dbb2e9c73789c16bb8bc956"
}
},
"secp256k1": {
"version": "0.5.2",
"vcsRevision": "5340cf188168d6afcafc8023770d880f067c0b2f",
"url": "https://github.com/status-im/nim-secp256k1.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"nimcrypto"
],
"checksums": {
"sha1": "ae9cbea4487be94a06653ffee075a7f1bd1e231e"
}
},
"stint": {
"version": "0.0.1",
"vcsRevision": "036c71d06a6b22f8f967ba9d54afd2189c3872ca",
"url": "https://github.com/status-im/stint.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "0f187a2115315ca898e5f9a30c5e506cf6057062"
}
},
"contractabi": {
"version": "0.4.4",
"vcsRevision": "b111c27b619fc1d81fb1c6942372824a18a71960",
"url": "https://github.com/status-im/nim-contract-abi",
"downloadMethod": "git",
"dependencies": [
"stint",
"stew",
"nimcrypto",
"questionable",
"upraises"
],
"checksums": {
"sha1": "3ed10b11eec8fe14a81e4e58dbc41f88fd6ddf7a"
}
},
"nitro": {
"version": "0.5.1",
"vcsRevision": "6b4c455bf4dad7449c1580055733a1738fcd5aec",
"url": "https://github.com/status-im/nim-nitro.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"questionable",
"upraises",
"contractabi",
"secp256k1",
"stint",
"stew"
],
"checksums": {
"sha1": "19d90deaeb84b19214dc2aab28a466f0bc4a7e2e"
}
},
"bearssl": {
"version": "0.1.5",
"vcsRevision": "32e125015ae4251675763842366380795a91b722",
"url": "https://github.com/status-im/nim-bearssl.git",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "c58a61e71c49ed7c7fe7608df40d60945f7c4bad"
}
},
"chronos": {
"version": "3.0.11",
"vcsRevision": "17fed89c99beac5a92d3668d0d3e9b0e4ac13936",
"url": "https://github.com/status-im/nim-chronos.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"bearssl",
"httputils",
"unittest2"
],
"checksums": {
"sha1": "f6fffc87571e5f76af2a77c4ebcc0e00909ced4e"
}
},
"testutils": {
"version": "0.4.2",
"vcsRevision": "aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2",
"url": "https://github.com/status-im/nim-testutils",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "94427e0cce0e0c5841edcd3a6530b4e6b857a3cb"
}
},
"faststreams": {
"version": "0.3.0",
"vcsRevision": "1b561a9e71b6bdad1c1cdff753418906037e9d09",
"url": "https://github.com/status-im/nim-faststreams.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"testutils",
"chronos",
"unittest2"
],
"checksums": {
"sha1": "97edf9797924af48566a0af8267203dc21d80c77"
}
},
"serialization": {
"version": "0.1.0",
"vcsRevision": "fcd0eadadde0ee000a63df8ab21dc4e9f015a790",
"url": "https://github.com/status-im/nim-serialization.git",
"downloadMethod": "git",
"dependencies": [
"faststreams",
"unittest2",
"stew"
],
"checksums": {
"sha1": "fef59519892cac70cccd81b612085caaa5e3e6cf"
}
},
"json_serialization": {
"version": "0.1.0",
"vcsRevision": "c5f0e2465e8375dfc7aa0f56ccef67cb680bc6b0",
"url": "https://github.com/status-im/nim-json-serialization.git",
"downloadMethod": "git",
"dependencies": [
"serialization",
"stew"
],
"checksums": {
"sha1": "d89d79d0679a3a41b350e3ad4be56c0308cc5ec6"
}
},
"chronicles": {
"version": "0.10.2",
"vcsRevision": "1682096306ddba8185dcfac360a8c3f952d721e4",
"url": "https://github.com/status-im/nim-chronicles.git",
"downloadMethod": "git",
"dependencies": [
"testutils",
"json_serialization"
],
"checksums": {
"sha1": "9a5bebb76b0f7d587a31e621d260119279e91c76"
}
},
"presto": {
"version": "0.0.4",
"vcsRevision": "962bb588d19c7180e39f0d9f18131e75861bab20",
"url": "https://github.com/status-im/nim-presto.git",
"downloadMethod": "git",
"dependencies": [
"chronos",
"chronicles",
"stew"
],
"checksums": {
"sha1": "8d3e77d7ddf14606504fe86c430b1b5712aada92"
}
},
"zlib": {
"version": "0.1.0",
"vcsRevision": "74cdeb54b21bededb5a515d36f608bc1850555a2",
"url": "https://github.com/status-im/nim-zlib",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "01d330dc4c1924e56b1559ee73bc760e526f635c"
}
},
"libbacktrace": {
"version": "0.0.8",
"vcsRevision": "ce966b1c469dda179b54346feaaf1a62202c984f",
"url": "https://github.com/status-im/nim-libbacktrace",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "ba7a2f3d21db894ace7bb4ebe0a5b06af995d68b"
}
},
"dnsclient": {
"version": "0.1.2",
"vcsRevision": "fbb76f8af8a33ab818184a7d4406d9fee20993be",
"url": "https://github.com/ba0f3/dnsclient.nim.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "663239a914c814204b30dda6e0902cc0fbd0b8ee"
}
},
"metrics": {
"version": "0.0.1",
"vcsRevision": "743f81d4f6c6ebf0ac02389f2392ff8b4235bee5",
"url": "https://github.com/status-im/nim-metrics.git",
"downloadMethod": "git",
"dependencies": [
"chronos"
],
"checksums": {
"sha1": "6274c7ae424b871bc21ca3a6b6713971ff6a8095"
}
},
"asynctest": {
"version": "0.3.1",
"vcsRevision": "5347c59b4b057443a014722aa40800cd8bb95c69",
"url": "https://github.com/status-im/asynctest.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "53e0b610d13700296755a4ebe789882cae47a3b9"
}
},
"websock": {
"version": "0.1.0",
"vcsRevision": "8a433c6ba43940b13ce56f83d79a93273ece5684",
"url": "https://github.com/status-im/nim-websock.git",
"downloadMethod": "git",
"dependencies": [
"chronos",
"httputils",
"chronicles",
"stew",
"nimcrypto",
"bearssl",
"zlib"
],
"checksums": {
"sha1": "1dbb4e1dd8c525c5674dca42b8eb25bdeb2f76b3"
}
},
"libp2p": {
"version": "0.0.2",
"vcsRevision": "eeb3c210a37408716b6a8b45f578adf87610cef2",
"url": "https://github.com/status-im/nim-libp2p.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"dnsclient",
"bearssl",
"chronicles",
"chronos",
"metrics",
"secp256k1",
"stew",
"websock"
],
"checksums": {
"sha1": "e9e9b93e6e425e47df1eea01a8e9efeac6e0fc97"
}
},
"combparser": {
"version": "0.2.0",
"vcsRevision": "ba4464c005d7617c008e2ed2ebc1ba52feb469c6",
"url": "https://github.com/PMunch/combparser.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a3635260961a893b88f69aac19f1b24e032a7e97"
}
},
"protobuf_serialization": {
"version": "0.2.0",
"vcsRevision": "f7d671f877e01213494aac7903421ccdbe70616f",
"url": "https://github.com/status-im/nim-protobuf-serialization.git",
"downloadMethod": "git",
"dependencies": [
"stew",
"faststreams",
"serialization",
"combparser"
],
"checksums": {
"sha1": "9418459027d0d5eb30a974649dc615a76e8e4aca"
}
},
"confutils": {
"version": "0.1.0",
"vcsRevision": "0435e67832b6bb8dfdf0ddb102903e9d820206d2",
"url": "https://github.com/status-im/nim-confutils.git",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "1edab14b434aca6ae28e2385982fa60d623c600a"
}
},
"news": {
"version": "0.5",
"vcsRevision": "e79420e835489132aaa412f993b565f5dd6295f4",
"url": "https://github.com/status-im/news",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "a5f1789bf650822156712fd3bdec1bf6ab4ac42e"
}
},
"json_rpc": {
"version": "0.0.2",
"vcsRevision": "5a281760803907f4989cacf109b516381dfbbe11",
"url": "https://github.com/status-im/nim-json-rpc",
"downloadMethod": "git",
"dependencies": [
"stew",
"nimcrypto",
"stint",
"chronos",
"httputils",
"chronicles",
"news",
"websock",
"json_serialization"
],
"checksums": {
"sha1": "3ec28a4c9e5dcd3210e85dfcfdd0a6baf46eccbe"
}
},
"ethers": {
"version": "0.1.7",
"vcsRevision": "270d358b869d02a4c625dde971f799db336670fb",
"url": "https://github.com/status-im/nim-ethers",
"downloadMethod": "git",
"dependencies": [
"chronos",
"contractabi",
"questionable",
"upraises",
"json_rpc",
"stint",
"stew"
],
"checksums": {
"sha1": "3eb78a87744d5894595f33a36b21348a59d8f1a5"
}
},
"libp2pdht": {
"version": "0.0.1",
"vcsRevision": "9a872518d621bf8b390f88cd65617bca6aca1d2d",
"url": "https://github.com/status-im/nim-libp2p-dht.git",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"bearssl",
"chronicles",
"chronos",
"libp2p",
"metrics",
"protobuf_serialization",
"secp256k1",
"stew",
"stint",
"asynctest"
],
"checksums": {
"sha1": "d97e8b751e11ccc7e059b79fb1a046d2b0d0e872"
}
},
"lrucache": {
"version": "1.2.1",
"vcsRevision": "8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd",
"url": "https://github.com/status-im/lrucache.nim",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "2c4365d10029d6f6a8b92a712e9002ac3886b07d"
}
},
"leopard": {
"version": "0.0.1",
"vcsRevision": "2a6a63923e9b95676b5ae7ff2c346be0e63e753c",
"url": "https://github.com/status-im/nim-leopard",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2",
"upraises"
],
"checksums": {
"sha1": "e71db348018eab26f3059e1c03bf3088c5109cfe"
}
},
"taskpools": {
"version": "0.0.3",
"vcsRevision": "8d408ac6cfc9c24ec8b7b65d5993e85050dcbaa9",
"url": "https://github.com/status-im/nim-taskpools.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "37bbbbb03d9b893af6980592624211ab057392c0"
}
},
"blscurve": {
"version": "0.0.1",
"vcsRevision": "0237e4e0e914fc19359c18a66406d33bc942775c",
"url": "https://github.com/status-im/nim-blscurve",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"stew",
"taskpools"
],
"checksums": {
"sha1": "65f58854ffd2098e0d0ca08f6ea0efb3c27529e0"
}
}
}
}

View File

@ -0,0 +1,65 @@
import std/sugar
import pkg/stew/results
import pkg/questionable
import pkg/chronos
import pkg/datastore/typedds
import pkg/datastore/sql/sqliteds
import pkg/codex/stores/queryiterhelper
import pkg/codex/utils/asynciter
import ../../asynctest
import ../helpers
proc encode(s: string): seq[byte] =
s.toBytes()
proc decode(T: type string, bytes: seq[byte]): ?!T =
success(string.fromBytes(bytes))
asyncchecksuite "Test QueryIter helper":
var
tds: TypedDatastore
setupAll:
tds = TypedDatastore.init(SQLiteDatastore.new(Memory).tryGet())
teardownAll:
(await tds.close()).tryGet
test "Should auto-dispose when QueryIter finishes":
let
source = {
"a": "11",
"b": "22"
}.toTable
Root = Key.init("/queryitertest").tryGet()
for k, v in source:
let key = (Root / k).tryGet()
(await tds.put(key, v)).tryGet()
var
disposed = false
queryIter = (await query[string](tds, Query.init(Root))).tryGet()
let iterDispose: IterDispose = queryIter.dispose
queryIter.dispose = () => (disposed = true; iterDispose())
let
iter1 = (await toAsyncIter[string](queryIter)).tryGet()
iter2 = await filterSuccess[string](iter1)
var items = initTable[string, string]()
for fut in iter2:
let item = await fut
items[item.key.value] = item.value
check:
items == source
disposed == true
queryIter.finished == true
iter1.finished == true
iter2.finished == true

View File

@ -1,5 +1,6 @@
import ./stores/testcachestore
import ./stores/testrepostore
import ./stores/testmaintenance
import ./stores/testqueryiterhelper
{.warning[UnusedImport]: off.}

View File

@ -1,6 +1,7 @@
import ./utils/testoptions
import ./utils/testkeyutils
import ./utils/testasyncstatemachine
import ./utils/testasynciter
import ./utils/testtimer
import ./utils/testthen
import ./utils/testtrackedfutures

View File

@ -0,0 +1,160 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/codex/utils/asynciter
import ../../asynctest
import ../helpers
asyncchecksuite "Test AsyncIter":
test "Should be finished":
let iter = AsyncIter[int].empty()
check:
iter.finished == true
test "Should map each item using `map`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = map[int, string](iter1,
proc (i: int): Future[string] {.async.} =
$i
)
var collected: seq[string]
for fut in iter2:
collected.add(await fut)
check:
collected == @["0", "1", "2", "3", "4"]
test "Should leave only odd items using `filter`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = await filter[int](iter1,
proc (i: int): Future[bool] {.async.} =
(i mod 2) == 1
)
var collected: seq[int]
for fut in iter2:
collected.add(await fut)
check:
collected == @[1, 3]
test "Should leave only odd items using `mapFilter`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = await mapFilter[int, string](iter1,
proc (i: int): Future[?string] {.async.} =
if (i mod 2) == 1:
some($i)
else:
string.none
)
var collected: seq[string]
for fut in iter2:
collected.add(await fut)
check:
collected == @["1", "3"]
test "Should yield all items before err using `map`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = map[int, string](iter1,
proc (i: int): Future[string] {.async.} =
if i < 3:
return $i
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[string]
expect CatchableError:
for fut in iter2:
collected.add(await fut)
check:
collected == @["0", "1", "2"]
iter2.finished
test "Should yield all items before err using `filter`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = await filter[int](iter1,
proc (i: int): Future[bool] {.async.} =
if i < 3:
return true
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[int]
expect CatchableError:
for fut in iter2:
collected.add(await fut)
check:
collected == @[0, 1, 2]
iter2.finished
test "Should yield all items before err using `mapFilter`":
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = await mapFilter[int, string](iter1,
proc (i: int): Future[?string] {.async.} =
if i < 3:
return some($i)
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[string]
expect CatchableError:
for fut in iter2:
collected.add(await fut)
check:
collected == @["0", "1", "2"]
iter2.finished
test "Should propagate cancellation error immediately":
let
fut = newFuture[?string]("testasynciter")
let
iter1 = AsyncIter[int].new(0..<5).delayBy(10.millis)
iter2 = await mapFilter[int, string](iter1,
proc (i: int): Future[?string] {.async.} =
if i < 3:
return some($i)
else:
return await fut
)
proc cancelFut(): Future[void] {.async.} =
await sleepAsync(100.millis)
await fut.cancelAndWait()
asyncSpawn(cancelFut())
var collected: seq[string]
expect CancelledError:
for fut in iter2:
collected.add(await fut)
check:
collected == @["0", "1"]
iter2.finished

View File

@ -0,0 +1,129 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/codex/utils/iter
import ../../asynctest
import ../helpers
checksuite "Test Iter":
test "Should be finished":
let iter = Iter[int].empty()
check:
iter.finished == true
test "Should be iterable with `items`":
let iter = Iter.new(0..<5)
let items =
collect:
for v in iter:
v
check:
items == @[0, 1, 2, 3, 4]
test "Should be iterable with `pairs`":
let iter = Iter.new(0..<5)
let pairs =
collect:
for i, v in iter:
(i, v)
check:
pairs == @[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
test "Should map each item using `map`":
let iter = Iter.new(0..<5)
.map((i: int) => $i)
check:
iter.toSeq() == @["0", "1", "2", "3", "4"]
test "Should leave only odd items using `filter`":
let iter = Iter.new(0..<5)
.filter((i: int) => (i mod 2) == 1)
check:
iter.toSeq() == @[1, 3]
test "Should leave only odd items using `mapFilter`":
let
iter1 = Iter.new(0..<5)
iter2 = mapFilter[int, string](iter1,
proc(i: int): ?string =
if (i mod 2) == 1:
some($i)
else:
string.none
)
check:
iter2.toSeq() == @["1", "3"]
test "Should yield all items before err using `map`":
let
iter = Iter.new(0..<5)
.map(
proc (i: int): string =
if i < 3:
return $i
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[string]
expect CatchableError:
for i in iter:
collected.add(i)
check:
collected == @["0", "1", "2"]
iter.finished
test "Should yield all items before err using `filter`":
let
iter = Iter.new(0..<5)
.filter(
proc (i: int): bool =
if i < 3:
return true
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[int]
expect CatchableError:
for i in iter:
collected.add(i)
check:
collected == @[0, 1, 2]
iter.finished
test "Should yield all items before err using `mapFilter`":
let
iter1 = Iter.new(0..<5)
iter2 = mapFilter[int, string](iter1,
proc (i: int): ?string =
if i < 3:
return some($i)
else:
raise newException(CatchableError, "Some error")
)
var collected: seq[string]
expect CatchableError:
for i in iter2:
collected.add(i)
check:
collected == @["0", "1", "2"]
iter2.finished

View File

@ -1,3 +0,0 @@
deps=""
resolver="MaxVer"
overrides="urls.rules"

@ -1 +1 @@
Subproject commit f4989fcce5d74a648e7e2598a72a7b21948f4a85
Subproject commit 3ab6b84a634a7b2ee8c0144f050bf5893cd47c17