Merge branch 'nimbus-build-system'

This commit is contained in:
thatben 2025-06-02 18:19:05 +02:00
commit 12f5cb5327
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
92 changed files with 994 additions and 379 deletions

6
.dockerignore Normal file
View File

@ -0,0 +1,6 @@
.github
build
docs
metrics
nimcache
tests

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
env.sh text eol=lf
docker/docker-entrypoint.sh text eol=lf

View File

@ -1,21 +1,21 @@
name: Docker
on:
push:
branches:
- master
tags:
- 'v*.*.*'
workflow_dispatch:
jobs:
build-and-push:
name: Build and Push
uses: codex-storage/github-actions/.github/workflows/docker-reusable.yml@master
with:
docker_file: docker/crawler.Dockerfile
dockerhub_repo: codexstorage/codex-network-crawler
tag_latest: ${{ github.ref_name == github.event.repository.default_branch || startsWith(github.ref, 'refs/tags/') }}
secrets: inherit
name: Docker
on:
push:
branches:
- master
tags:
- 'v*.*.*'
workflow_dispatch:
jobs:
build-and-push:
name: Build and Push
uses: codex-storage/github-actions/.github/workflows/docker-reusable.yml@master
with:
docker_file: docker/crawler.Dockerfile
dockerhub_repo: codexstorage/codex-network-crawler
tag_latest: ${{ github.ref_name == github.event.repository.default_branch || startsWith(github.ref, 'refs/tags/') }}
secrets: inherit

View File

@ -1,51 +1,43 @@
name: CI
on: [push, pull_request]
jobs:
linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check `nph` formatting
uses: arnetheduck/nph-action@v1
with:
version: 0.6.1
options: "./"
fail: true
suggest: true
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest, windows-latest]
nim: [2.0.14]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: iffy/install-nim@v4
with:
version: ${{ matrix.nim }}
- name: Enable git long paths
run: |
git config --system core.longpaths true
- name: Update nimble
run: |
nimble install nimble
nimble --version
- name: Use updated nimble version on Windows
if: contains(matrix.os, 'windows')
run: |
del $HOME\.nimble\bin\nimble.exe
nimble --version
- name: Build
run: nimble build -y
- name: Test
run: nimble test -y
name: CI
on: [push, pull_request]
jobs:
linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check `nph` formatting
uses: arnetheduck/nph-action@v1
with:
version: 0.6.1
options: "./"
fail: true
suggest: true
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest, windows-latest]
nim: [2.2.4]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Build Nim and dependencies
run: |
which gcc
gcc --version
make update
./env.sh nim --version
make
- name: Run tests
run: |
make test
- name: Build crawler
run: |
make

24
.gitignore vendored
View File

@ -1,17 +1,9 @@
*
!*.*
!*/
coverage
nimcache
tests/testAll
nimble.develop
nimble.paths
nim.cfg
nimbus-build-system.paths
vendor/*
NimBinaries
*
!*.*
!*/
nimble.paths
nimbus-build-system.paths
vendor/*
*.exe
crawler_data
.update.timestamp
*.dSYM
.vscode/*
*.exe
crawler_data

141
.gitmodules vendored Normal file
View File

@ -0,0 +1,141 @@
[submodule "vendor/nimbus-build-system"]
path = vendor/nimbus-build-system
url = https://github.com/status-im/nimbus-build-system.git
[submodule "vendor/nim-libp2p"]
path = vendor/nim-libp2p
url = https://github.com/vacp2p/nim-libp2p.git
[submodule "vendor/nimcrypto"]
path = vendor/nimcrypto
url = https://github.com/cheatfate/nimcrypto.git
[submodule "vendor/nim-chronicles"]
path = vendor/nim-chronicles
url = https://github.com/status-im/nim-chronicles.git
[submodule "vendor/nim-metrics"]
path = vendor/nim-metrics
url = https://github.com/status-im/nim-metrics.git
[submodule "vendor/nim-secp256k1"]
path = vendor/nim-secp256k1
url = https://github.com/status-im/nim-secp256k1.git
[submodule "vendor/nim-stew"]
path = vendor/nim-stew
url = https://github.com/status-im/nim-stew.git
[submodule "vendor/questionable"]
path = vendor/questionable
url = https://github.com/status-im/questionable.git
[submodule "vendor/upraises"]
path = vendor/upraises
url = https://github.com/markspanbroek/upraises.git
[submodule "vendor/asynctest"]
path = vendor/asynctest
url = https://github.com/status-im/asynctest.git
[submodule "vendor/nim-confutils"]
path = vendor/nim-confutils
url = https://github.com/status-im/nim-confutils.git
[submodule "vendor/nim-nat-traversal"]
path = vendor/nim-nat-traversal
url = https://github.com/status-im/nim-nat-traversal.git
[submodule "vendor/nim-libbacktrace"]
path = vendor/nim-libbacktrace
url = https://github.com/status-im/nim-libbacktrace.git
[submodule "vendor/nim-chronos"]
path = vendor/nim-chronos
url = https://github.com/status-im/nim-chronos.git
[submodule "vendor/nim-json-serialization"]
path = vendor/nim-json-serialization
url = https://github.com/status-im/nim-json-serialization.git
[submodule "vendor/nim-serialization"]
path = vendor/nim-serialization
url = https://github.com/status-im/nim-serialization.git
[submodule "vendor/nim-bearssl"]
path = vendor/nim-bearssl
url = https://github.com/status-im/nim-bearssl.git
[submodule "vendor/stint"]
path = vendor/stint
url = https://github.com/status-im/stint.git
[submodule "vendor/nim-unittest2"]
path = vendor/nim-unittest2
url = https://github.com/status-im/nim-unittest2.git
[submodule "vendor/nim-websock"]
path = vendor/nim-websock
url = https://github.com/status-im/nim-websock.git
[submodule "vendor/nim-contract-abi"]
path = vendor/nim-contract-abi
url = https://github.com/status-im/nim-contract-abi
[submodule "vendor/nim-json-rpc"]
path = vendor/nim-json-rpc
url = https://github.com/status-im/nim-json-rpc
[submodule "vendor/nim-ethers"]
path = vendor/nim-ethers
url = https://github.com/status-im/nim-ethers
[submodule "vendor/lrucache.nim"]
path = vendor/lrucache.nim
url = https://github.com/status-im/lrucache.nim
[submodule "vendor/nim-blscurve"]
path = vendor/nim-blscurve
url = https://github.com/status-im/nim-blscurve.git
[submodule "vendor/nim-codex-dht"]
path = vendor/nim-codex-dht
url = https://github.com/codex-storage/nim-codex-dht.git
[submodule "vendor/nim-eth"]
path = vendor/nim-eth
url = https://github.com/status-im/nim-eth
[submodule "vendor/codex-contracts-eth"]
path = vendor/codex-contracts-eth
url = https://github.com/status-im/codex-contracts-eth
[submodule "vendor/nim-protobuf-serialization"]
path = vendor/nim-protobuf-serialization
url = https://github.com/status-im/nim-protobuf-serialization
[submodule "vendor/nim-results"]
path = vendor/nim-results
url = https://github.com/arnetheduck/nim-results
[submodule "vendor/nim-testutils"]
path = vendor/nim-testutils
url = https://github.com/status-im/nim-testutils
[submodule "vendor/nim-serde"]
path = vendor/nim-serde
url = https://github.com/codex-storage/nim-serde.git
[submodule "vendor/nph"]
path = vendor/nph
url = https://github.com/arnetheduck/nph.git
[submodule "vendor/nim-faststreams"]
path = vendor/nim-faststreams
url = https://github.com/status-im/nim-faststreams.git
[submodule "vendor/nim-poseidon2"]
path = vendor/nim-poseidon2
url = https://github.com/codex-storage/nim-poseidon2.git
[submodule "vendor/constantine"]
path = vendor/constantine
url = https://github.com/mratsim/constantine.git
[submodule "vendor/nim-taskpools"]
path = vendor/nim-taskpools
url = https://github.com/status-im/nim-taskpools.git
[submodule "vendor/nim-quic"]
path = vendor/nim-quic
url = https://github.com/vacp2p/nim-quic.git
[submodule "vendor/nim-ngtcp2"]
path = vendor/nim-ngtcp2
url = https://github.com/vacp2p/nim-ngtcp2.git
[submodule "vendor/nim-datastore"]
path = vendor/nim-datastore
url = https://github.com/status-im/nim-datastore.git
[submodule "vendor/nim-sqlite3-abi"]
path = vendor/nim-sqlite3-abi
url = https://github.com/arnetheduck/nim-sqlite3-abi.git
[submodule "vendor/nim-leveldbstatic"]
path = vendor/nim-leveldbstatic
url = https://github.com/codex-storage/nim-leveldb.git
[submodule "vendor/nim-docopt"]
path = vendor/nim-docopt
url = https://github.com/docopt/docopt.nim.git
[submodule "vendor/nim-regex"]
path = vendor/nim-regex
url = https://github.com/nitely/nim-regex.git
[submodule "vendor/nim-unicodedb"]
path = vendor/nim-unicodedb
url = https://github.com/nitely/nim-unicodedb.git
[submodule "vendor/nim-http-utils"]
path = vendor/nim-http-utils
url = https://github.com/status-im/nim-http-utils.git
[submodule "vendor/nim-zlib"]
path = vendor/nim-zlib
url = https://github.com/status-im/nim-zlib

197
Makefile Normal file
View File

@ -0,0 +1,197 @@
# Copyright (c) 2020 Status Research & Development GmbH. Licensed under
# either of:
# - Apache License, version 2.0
# - MIT license
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
# This is the Nim version used locally and in regular CI builds.
# Can be a specific version tag, a branch name, or a commit hash.
# Can be overridden by setting the NIM_COMMIT environment variable
# before calling make.
#
# For readability in CI, if NIM_COMMIT is set to "pinned",
# this will also default to the version pinned here.
#
# If NIM_COMMIT is set to "nimbusbuild", this will use the
# version pinned by nimbus-build-system.
PINNED_NIM_VERSION := v2.2.4
ifeq ($(NIM_COMMIT),)
NIM_COMMIT := $(PINNED_NIM_VERSION)
else ifeq ($(NIM_COMMIT),pinned)
NIM_COMMIT := $(PINNED_NIM_VERSION)
endif
ifeq ($(NIM_COMMIT),nimbusbuild)
undefine NIM_COMMIT
else
export NIM_COMMIT
endif
SHELL := bash # the shell used internally by Make
# used inside the included makefiles
BUILD_SYSTEM_DIR := vendor/nimbus-build-system
# -d:insecure - Necessary to enable Prometheus HTTP endpoint for metrics
# -d:chronicles_colors:none - Necessary to disable colors in logs for Docker
DOCKER_IMAGE_NIM_PARAMS ?= -d:chronicles_colors:none -d:insecure
LINK_PCRE := 0
ifeq ($(OS),Windows_NT)
ifeq ($(PROCESSOR_ARCHITECTURE), AMD64)
ARCH = x86_64
endif
ifeq ($(PROCESSOR_ARCHITECTURE), ARM64)
ARCH = arm64
endif
else
UNAME_P := $(shell uname -m)
ifneq ($(filter $(UNAME_P), i686 i386 x86_64),)
ARCH = x86_64
endif
ifneq ($(filter $(UNAME_P), aarch64 arm),)
ARCH = arm64
endif
endif
ifeq ($(ARCH), x86_64)
CXXFLAGS ?= -std=c++17 -mssse3
else
CXXFLAGS ?= -std=c++17
endif
export CXXFLAGS
# we don't want an error here, so we can handle things later, in the ".DEFAULT" target
-include $(BUILD_SYSTEM_DIR)/makefiles/variables.mk
.PHONY: \
all \
clean \
coverage \
deps \
libbacktrace \
test \
update
ifeq ($(NIM_PARAMS),)
# "variables.mk" was not included, so we update the submodules.
GIT_SUBMODULE_UPDATE := git submodule update --init --recursive
.DEFAULT:
+@ echo -e "Git submodules not found. Running '$(GIT_SUBMODULE_UPDATE)'.\n"; \
$(GIT_SUBMODULE_UPDATE); \
echo
# Now that the included *.mk files appeared, and are newer than this file, Make will restart itself:
# https://www.gnu.org/software/make/manual/make.html#Remaking-Makefiles
#
# After restarting, it will execute its original goal, so we don't have to start a child Make here
# with "$(MAKE) $(MAKECMDGOALS)". Isn't hidden control flow great?
else # "variables.mk" was included. Business as usual until the end of this file.
# default target, because it's the first one that doesn't start with '.'
# Builds the crawler binary
all: | build deps
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim codexcrawler $(NIM_PARAMS) build.nims
# must be included after the default target
-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk
# "-d:release" implies "--stacktrace:off" and it cannot be added to config.nims
ifeq ($(USE_LIBBACKTRACE), 0)
NIM_PARAMS := $(NIM_PARAMS) -d:debug -d:disable_libbacktrace
else
NIM_PARAMS := $(NIM_PARAMS) -d:release
endif
deps: | deps-common nat-libs
ifneq ($(USE_LIBBACKTRACE), 0)
deps: | libbacktrace
endif
update: | update-common
# detecting the os
ifeq ($(OS),Windows_NT) # is Windows_NT on XP, 2000, 7, Vista, 10...
detected_OS := Windows
else ifeq ($(strip $(shell uname)),Darwin)
detected_OS := macOS
else
# e.g. Linux
detected_OS := $(strip $(shell uname))
endif
# Builds and run a part of the test suite
test: | build deps
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim test $(NIM_PARAMS) build.nims
# nim-libbacktrace
LIBBACKTRACE_MAKE_FLAGS := -C vendor/nim-libbacktrace --no-print-directory BUILD_CXX_LIB=0
libbacktrace:
ifeq ($(detected_OS), Windows)
# MSYS2 detection
ifneq ($(MSYSTEM),)
+ $(MAKE) $(LIBBACKTRACE_MAKE_FLAGS) CMAKE_ARGS="-G'MSYS Makefiles'"
else
+ $(MAKE) $(LIBBACKTRACE_MAKE_FLAGS)
endif
else
+ $(MAKE) $(LIBBACKTRACE_MAKE_FLAGS)
endif
# usual cleaning
clean: | clean-common
rm -rf build
ifneq ($(USE_LIBBACKTRACE), 0)
+ $(MAKE) -C vendor/nim-libbacktrace clean $(HANDLE_OUTPUT)
endif
############
## Format ##
############
.PHONY: build-nph install-nph-hook clean-nph print-nph-path
# Default location for nph binary shall be next to nim binary to make it available on the path.
NPH:=$(shell dirname $(NIM_BINARY))/nph
build-nph:
ifeq ("$(wildcard $(NPH))","")
$(ENV_SCRIPT) nim c vendor/nph/src/nph.nim && \
mv vendor/nph/src/nph $(shell dirname $(NPH))
echo "nph utility is available at " $(NPH)
endif
GIT_PRE_COMMIT_HOOK := .git/hooks/pre-commit
install-nph-hook: build-nph
ifeq ("$(wildcard $(GIT_PRE_COMMIT_HOOK))","")
cp ./tools/scripts/git_pre_commit_format.sh $(GIT_PRE_COMMIT_HOOK)
else
echo "$(GIT_PRE_COMMIT_HOOK) already present, will NOT override"
exit 1
endif
nph/%: build-nph
echo -e $(FORMAT_MSG) "nph/$*" && \
$(NPH) $*
format:
$(NPH) *.nim
$(NPH) codexcrawler/
$(NPH) tests/
clean-nph:
rm -f $(NPH)
# To avoid hardcoding nph binary location in several places
print-nph-path:
echo "$(NPH)"
clean: | clean-nph
endif # "variables.mk" was not included

View File

@ -1,27 +1,27 @@
# Codex Network Crawler
![Crawler](crawler.png)
[![License: Apache](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Stability: experimental](https://img.shields.io/badge/stability-experimental-orange.svg)](#stability)
[![CI (GitHub Actions)](https://github.com/codex-storage/nim-codex-dht/workflows/CI/badge.svg?branch=master)](https://github.com/codex-storage/nim-codex-dht/actions/workflows/ci.yml?query=workflow%3ACI+branch%3Amaster)
[![codecov](https://codecov.io/gh/codex-storage/nim-codex-dht/branch/master/graph/badge.svg?token=tlmMJgU4l7)](https://codecov.io/gh/codex-storage/nim-codex-dht)
# !! Work in Progress !!
This project uses nim-codex-dht, nim-libp2p, nim-ethers, and nim-metrics to create a metrics service. The crawler will traverse the Codex network and produce metrics such as:
- Number of DHT nodes (alive vs total)
- P2P connectivity (percentage)
- Storage contract statistics (created, total size, average size, average duration, pricing information??)
Metrics are published from a scrape target.
# Usage
```sh
nimble format
nimble build
nimble test
nimble run
```
# Codex Network Crawler
![Crawler](crawler.png)
[![License: Apache](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Stability: experimental](https://img.shields.io/badge/stability-experimental-orange.svg)](#stability)
[![CI (GitHub Actions)](https://github.com/codex-storage/nim-codex-dht/workflows/CI/badge.svg?branch=master)](https://github.com/codex-storage/nim-codex-dht/actions/workflows/ci.yml?query=workflow%3ACI+branch%3Amaster)
[![codecov](https://codecov.io/gh/codex-storage/nim-codex-dht/branch/master/graph/badge.svg?token=tlmMJgU4l7)](https://codecov.io/gh/codex-storage/nim-codex-dht)
# !! Work in Progress !!
This project uses nim-codex-dht, nim-libp2p, nim-ethers, and nim-metrics to create a metrics service. The crawler will traverse the Codex network and produce metrics such as:
- Number of DHT nodes (alive vs total)
- P2P connectivity (percentage)
- Storage contract statistics (created, total size, average size, average duration, pricing information??)
Metrics are published from a scrape target.
# Usage
```sh
nimble format
nimble build
nimble test
nimble run
```

43
build.nims Normal file
View File

@ -0,0 +1,43 @@
mode = ScriptMode.Verbose
import std/os except commandLineParams
### Helper functions
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
if not dirExists "build":
mkDir "build"
# allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims"
var extra_params = params
when compiles(commandLineParams):
for param in commandLineParams():
extra_params &= " " & param
else:
for i in 2 ..< paramCount():
extra_params &= " " & paramStr(i)
let
# Place build output in 'build' folder, even if name includes a longer path.
outName = os.lastPathPart(name)
cmd =
"nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir &
name & ".nim"
exec(cmd)
proc test(name: string, srcDir = "tests/", params = "", lang = "c") =
buildBinary name, srcDir, params
exec "build/" & name
task codexcrawler, "build codexcrawler binary":
buildBinary "codexcrawler",
params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
task testCodexcrawler, "Build & run Codex Crawler tests":
test "testCodexCrawler"
task build, "build codex crawler binary":
codexCrawlerTask()
task test, "Run tests":
testCodexCrawlerTask()

View File

@ -18,7 +18,9 @@ type Application* = ref object
state: State
components: seq[Component]
proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
proc initializeApp(
app: Application, config: Config
): Future[?!void] {.async: (raises: [CancelledError]).} =
app.state = State(
status: ApplicationStatus.Running,
config: config,
@ -48,7 +50,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
return success()
proc stopComponents(app: Application) {.async.} =
proc stopComponents(app: Application) {.async: (raises: [CancelledError]).} =
for c in app.components:
if err =? (await c.stop()).errorOption:
error "Failed to stop component", err = err.msg

View File

@ -3,17 +3,21 @@ import pkg/questionable/results
type Component* = ref object of RootObj
method awake*(c: Component): Future[?!void] {.async, base.} =
method awake*(
c: Component
): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Awake is called on all components in an unspecified order.
# Use this method to subscribe/connect to other components.
return success()
method start*(c: Component): Future[?!void] {.async, base.} =
method start*(
c: Component
): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Start is called on all components in an unspecified order.
# Is is guaranteed that all components have already successfulled handled 'awake'.
# Use this method to begin the work of this component.
return success()
method stop*(c: Component): Future[?!void] {.async, base.} =
method stop*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Use this method to stop, unsubscribe, and clean up any resources.
return success()

View File

@ -19,7 +19,7 @@ type ChainCrawler* = ref object of Component
proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.store.add(rid)
method start*(c: ChainCrawler): Future[?!void] {.async.} =
method start*(c: ChainCrawler): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} =

View File

@ -65,7 +65,7 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
c.updateMetrics(update)
return success()
method start*(c: ChainMetrics): Future[?!void] {.async.} =
method start*(c: ChainMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =

View File

@ -46,14 +46,17 @@ proc step(c: DhtCrawler): Future[?!void] {.async: (raises: []).} =
return success()
method start*(c: DhtCrawler): Future[?!void] {.async.} =
method start*(c: DhtCrawler): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await c.step()
if c.state.config.dhtEnable:
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
try:
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
except CatchableError as err:
return failure(err.msg)
return success()

View File

@ -27,7 +27,7 @@ proc updateMetrics(d: DhtMetrics) =
proc handleCheckEvent(
d: DhtMetrics, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
if event.isOk:
?await d.ok.add(event.id)
?await d.nok.remove(event.id)
@ -38,31 +38,35 @@ proc handleCheckEvent(
d.updateMetrics()
return success()
proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} =
proc handleDeleteEvent(
d: DhtMetrics, nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
for nid in nids:
?await d.ok.remove(nid)
?await d.nok.remove(nid)
d.updateMetrics()
return success()
method awake*(d: DhtMetrics): Future[?!void] {.async.} =
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
method awake*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
proc onCheck(
event: DhtNodeCheckEventData
): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.handleCheckEvent(event)
proc onDelete(nids: seq[Nid]): Future[?!void] {.async.} =
proc onDelete(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.handleDeleteEvent(nids)
d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck)
d.subDel = d.state.events.nodesDeleted.subscribe(onDelete)
return success()
method start*(d: DhtMetrics): Future[?!void] {.async.} =
method start*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
?await d.ok.load()
?await d.nok.load()
return success()
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
method stop*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.state.events.dhtNodeCheck.unsubscribe(d.subCheck)
await d.state.events.nodesDeleted.unsubscribe(d.subDel)
return success()

View File

@ -70,13 +70,18 @@ proc encode*(e: NodeEntry): seq[byte] =
e.toBytes()
proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(
NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64)
)
return NodeEntry.fromBytes(bytes)
try:
if bytes.len < 1:
return success(
NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64)
)
return NodeEntry.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
proc storeNodeIsNew(
s: NodeStore, nid: Nid
): Future[?!bool] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err)
@ -91,7 +96,9 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
return success(not exists)
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc fireNewNodesDiscovered(
s: NodeStore, nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
await s.state.events.newNodesDiscovered.fire(nids)
proc fireNodesDeleted(
@ -99,7 +106,9 @@ proc fireNodesDeleted(
): Future[?!void] {.async: (raises: []).} =
await s.state.events.nodesDeleted.fire(nids)
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc processFoundNodes(
s: NodeStore, nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
var newNodes = newSeq[Nid]()
for nid in nids:
without isNew =? (await s.storeNodeIsNew(nid)), err:
@ -114,7 +123,7 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc processNodeCheck(
s: NodeStore, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $(event.id)), err:
error "failed to format key", err = err.msg
return failure(err)
@ -142,7 +151,9 @@ proc processNodeCheck(
?await s.store.put(key, entry)
return success()
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
proc deleteEntry(
s: NodeStore, nid: Nid
): Future[?!bool] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err)
@ -202,20 +213,24 @@ method deleteEntries*(
?await s.fireNodesDeleted(deleted)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} =
method start*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNodesFound(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await s.processFoundNodes(nids)
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
proc onCheck(
event: DhtNodeCheckEventData
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await s.processNodeCheck(event)
s.subFound = s.state.events.nodesFound.subscribe(onNodesFound)
s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck)
return success()
method stop*(s: NodeStore): Future[?!void] {.async.} =
method stop*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} =
await s.state.events.nodesFound.unsubscribe(s.subFound)
await s.state.events.dhtNodeCheck.unsubscribe(s.subCheck)
return success()

View File

@ -51,9 +51,12 @@ proc encode*(e: RequestEntry): seq[byte] =
e.toBytes()
proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(RequestEntry(isValid: false))
return RequestEntry.fromBytes(bytes)
try:
if bytes.len < 1:
return success(RequestEntry(isValid: false))
return RequestEntry.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} =
without key =? Key.init(requeststoreName / $rid), err:

View File

@ -56,7 +56,7 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []
return failure(err)
return success()
method start*(t: TimeTracker): Future[?!void] {.async.} =
method start*(t: TimeTracker): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} =

View File

@ -54,10 +54,12 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
return success(item)
method awake*(t: TodoList): Future[?!void] {.async.} =
method awake*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} =
info "initializing..."
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNewNodes(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
t.addNodes(nids)
return success()
@ -65,7 +67,7 @@ method awake*(t: TodoList): Future[?!void] {.async.} =
t.subRev = t.state.events.nodesToRevisit.subscribe(onNewNodes)
return success()
method stop*(t: TodoList): Future[?!void] {.async.} =
method stop*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} =
await t.state.events.newNodesDiscovered.unsubscribe(t.subNew)
await t.state.events.nodesToRevisit.unsubscribe(t.subRev)
return success()

View File

@ -6,31 +6,31 @@ import pkg/codexdht
import ./utils/version
let doc =
"""
Codex Network Crawler. Generates network metrics.
Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--dhtEnable=<e>] [--stepDelay=<ms>] [--revisitDelay=<m>] [--checkDelay=<m>] [--expiryDelay=<m>] [--marketplaceEnable=<e>] [--ethProvider=<a>] [--marketplaceAddress=<a>] [--requestCheckDelay=<m>]
Options:
--logLevel=<l> Sets log level [default: INFO]
--publicIp=<a> Public IP address where this instance is reachable.
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--dhtEnable=<e> Set to "1" to enable DHT crawler [default: 1]
--stepDelay=<ms> Delay in milliseconds per node visit [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
--expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
--marketplaceEnable=<e> Set to "1" to enable marketplace metrics [default: 1]
--ethProvider=<a> Address including http(s) or ws of the eth provider
--marketplaceAddress=<a> Eth address of Codex contracts deployment
--requestCheckDelay=<m> Delay in minutes after which storage contract status is (re)checked [default: 10]
"""
Codex Network Crawler. Generates network metrics.
Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--dhtEnable=<e>] [--stepDelay=<ms>] [--revisitDelay=<m>] [--checkDelay=<m>] [--expiryDelay=<m>] [--marketplaceEnable=<e>] [--ethProvider=<a>] [--marketplaceAddress=<a>] [--requestCheckDelay=<m>]
Options:
--logLevel=<l> Sets log level [default: INFO]
--publicIp=<a> Public IP address where this instance is reachable.
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--dhtEnable=<e> Set to "1" to enable DHT crawler [default: 1]
--stepDelay=<ms> Delay in milliseconds per node visit [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
--expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
--marketplaceEnable=<e> Set to "1" to enable marketplace metrics [default: 1]
--ethProvider=<a> Address including http(s) or ws of the eth provider
--marketplaceAddress=<a> Eth address of Codex contracts deployment
--requestCheckDelay=<m> Delay in minutes after which storage contract status is (re)checked [default: 10]
"""
import strutils

View File

@ -18,7 +18,9 @@ import ./components/chainmetrics
import ./components/chaincrawler
import ./components/requeststore
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
proc createComponents*(
state: State
): Future[?!seq[Component]] {.async: (raises: [CancelledError]).} =
var components: seq[Component] = newSeq[Component]()
let clock = createClock()

View File

@ -28,17 +28,22 @@ proc encode(s: Nid): seq[byte] =
s.toBytes()
proc decode(T: type Nid, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
try:
if bytes.len < 1:
return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
proc saveItem(
this: List, item: Nid
): Future[?!void] {.async: (raises: [CancelledError]).} =
without itemKey =? Key.init(this.name / $item), err:
return failure(err)
?await this.store.put(itemKey, item)
return success()
method load*(this: List): Future[?!void] {.async, base.} =
method load*(this: List): Future[?!void] {.async: (raises: [CancelledError]), base.} =
without queryKey =? Key.init(this.name), err:
return failure(err)
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:
@ -58,7 +63,9 @@ method load*(this: List): Future[?!void] {.async, base.} =
proc contains*(this: List, nid: Nid): bool =
this.items.anyIt(it == nid)
method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
method add*(
this: List, nid: Nid
): Future[?!void] {.async: (raises: [CancelledError]), base.} =
if this.contains(nid):
return success()
@ -69,7 +76,9 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
return success()
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
method remove*(
this: List, nid: Nid
): Future[?!void] {.async: (raises: [CancelledError]), base.} =
if not this.contains(nid):
return success()

View File

@ -75,15 +75,20 @@ method getNeighbors*(
except CatchableError as exc:
return failure(exc.msg)
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
proc findPeer*(
d: Dht, peerId: PeerId
): Future[?PeerRecord] {.async: (raises: [CancelledError]).} =
trace "protocol.resolve..."
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
try:
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
except CatchableError as exc:
error "CatchableError in protocol.resolve", err = exc.msg
return PeerRecord.none
method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
trace "Removing provider", peerId
@ -109,13 +114,19 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
method start*(d: Dht): Future[?!void] {.async.} =
d.protocol.open()
await d.protocol.start()
method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
d.protocol.open()
await d.protocol.start()
except CatchableError as exc:
return failure(exc.msg)
return success()
method stop*(d: Dht): Future[?!void] {.async.} =
await d.protocol.closeWait()
method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
await d.protocol.closeWait()
except CatchableError as exc:
return failure(exc.msg)
return success()
proc new(
@ -154,7 +165,7 @@ proc new(
self
proc createDht*(state: State): Future[?!Dht] {.async.} =
proc createDht*(state: State): Future[?!Dht] {.async: (raises: [CancelledError]).} =
without dhtStore =? createDatastore(state.config.dataDir / "dht"), err:
return failure(err)
let keyPath = state.config.dataDir / "privatekey"

View File

@ -50,7 +50,7 @@ proc fetchRequestInfo(
method subscribeToNewRequests*(
m: MarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []), base.} =
proc resultWrapper(rid: Rid): Future[void] {.async.} =
proc resultWrapper(rid: Rid): Future[void] {.async: (raises: [CancelledError]).} =
let response = await onNewRequest(rid)
if error =? response.errorOption:
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
@ -109,14 +109,19 @@ method getRequestInfo*(
else:
notStarted()
method awake*(m: MarketplaceService): Future[?!void] {.async.} =
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")
method awake*(
m: MarketplaceService
): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")
let marketplace = Marketplace.new(marketplaceAddress, provider)
m.market = some(OnChainMarket.new(marketplace))
return success()
let marketplace = Marketplace.new(marketplaceAddress, provider)
m.market = some(OnChainMarket.new(marketplace))
return success()
except JsonRpcProviderError as err:
return failure(err.msg)
proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =
return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock)

View File

@ -1,5 +1,5 @@
import pkg/contractabi
import pkg/ethers/fields
import pkg/ethers/contracts/fields
import pkg/questionable/results
export contractabi

View File

@ -89,6 +89,8 @@ template convertEthersError(body) =
body
except EthersError as error:
raiseMarketError(error.msgDetail)
except CatchableError as error:
raiseMarketError(error.msg)
proc loadConfig(
market: OnChainMarket
@ -121,7 +123,9 @@ proc config(
return resolvedConfig
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
proc approveFunds(
market: OnChainMarket, amount: UInt256
) {.async: (raises: [CancelledError]).} =
raiseAssert("Not available: approveFunds")
proc getZkeyHash*(
@ -138,43 +142,59 @@ proc periodicity*(
let period = config.proofs.period
return Periodicity(seconds: period)
proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async.} =
proc proofTimeout*(
market: OnChainMarket
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let config = await market.config()
return config.proofs.timeout
proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} =
proc repairRewardPercentage*(
market: OnChainMarket
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let config = await market.config()
return config.collateral.repairRewardPercentage
proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} =
proc requestDurationLimit*(
market: OnChainMarket
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let config = await market.config()
return config.requestDurationLimit
proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} =
proc proofDowntime*(
market: OnChainMarket
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let config = await market.config()
return config.proofs.downtime
proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} =
proc getPointer*(
market: OnChainMarket, slotId: SlotId
): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getPointer(slotId, overrides)
proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} =
proc myRequests*(
market: OnChainMarket
): Future[seq[RequestId]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return await market.contract.myRequests
proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} =
proc mySlots*(
market: OnChainMarket
): Future[seq[SlotId]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let slots = await market.contract.mySlots()
debug "Fetched my slots", numSlots = len(slots)
return slots
proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
proc requestStorage(
market: OnChainMarket, request: StorageRequest
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
debug "Requesting storage"
await market.approveFunds(request.totalPrice())
@ -182,7 +202,7 @@ proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
proc getRequest*(
market: OnChainMarket, id: RequestId
): Future[?StorageRequest] {.async.} =
): Future[?StorageRequest] {.async: (raises: [CancelledError, MarketError]).} =
let key = $id
convertEthersError:
@ -194,7 +214,7 @@ proc getRequest*(
proc requestState*(
market: OnChainMarket, requestId: RequestId
): Future[?RequestState] {.async.} =
): Future[?RequestState] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -202,22 +222,28 @@ proc requestState*(
except Marketplace_UnknownRequest:
return none RequestState
proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async.} =
proc slotState*(
market: OnChainMarket, slotId: SlotId
): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.slotState(slotId, overrides)
proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} =
proc getRequestEnd*(
market: OnChainMarket, id: RequestId
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return (await market.contract.requestEnd(id)).uint64
proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} =
proc requestExpiresAt*(
market: OnChainMarket, id: RequestId
): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return (await market.contract.requestExpiry(id)).uint64
proc getHost(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[?Address] {.async.} =
): Future[?Address] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let slotId = slotId(requestId, slotIndex)
let address = await market.contract.getHost(slotId)
@ -228,11 +254,13 @@ proc getHost(
proc currentCollateral*(
market: OnChainMarket, slotId: SlotId
): Future[UInt256] {.async.} =
): Future[UInt256] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return await market.contract.currentCollateral(slotId)
proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} =
proc getActiveSlot*(
market: OnChainMarket, slotId: SlotId
): Future[?Slot] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
try:
return some await market.contract.getActiveSlot(slotId)
@ -245,7 +273,7 @@ proc fillSlot(
slotIndex: uint64,
proof: Groth16Proof,
collateral: UInt256,
) {.async.} =
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
logScope:
requestId
@ -256,14 +284,20 @@ proc fillSlot(
discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1)
trace "fillSlot transaction completed"
proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
proc freeSlot*(
market: OnChainMarket, slotId: SlotId
) {.async: (raises: [CancelledError]).} =
raiseAssert("Not supported")
proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} =
proc withdrawFunds(
market: OnChainMarket, requestId: RequestId
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
discard await market.contract.withdrawFunds(requestId).confirm(1)
proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
proc isProofRequired*(
market: OnChainMarket, id: SlotId
): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -271,7 +305,9 @@ proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.}
except Marketplace_SlotIsFree:
return false
proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
proc willProofBeRequired*(
market: OnChainMarket, id: SlotId
): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -281,22 +317,26 @@ proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.asy
proc getChallenge*(
market: OnChainMarket, id: SlotId
): Future[ProofChallenge] {.async.} =
): Future[ProofChallenge] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getChallenge(id, overrides)
proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} =
proc submitProof*(
market: OnChainMarket, id: SlotId, proof: Groth16Proof
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
discard await market.contract.submitProof(id, proof).confirm(1)
proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} =
proc markProofAsMissing*(
market: OnChainMarket, id: SlotId, period: Period
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
discard await market.contract.markProofAsMissing(id, period).confirm(1)
proc canProofBeMarkedAsMissing*(
market: OnChainMarket, id: SlotId, period: Period
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
let provider = market.contract.provider
let contractWithoutSigner = market.contract.connect(provider)
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -309,7 +349,7 @@ proc canProofBeMarkedAsMissing*(
proc reserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
) {.async.} =
) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
discard await market.contract
.reserveSlot(
@ -322,13 +362,13 @@ proc reserveSlot*(
proc canReserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return await market.contract.canReserveSlot(requestId, slotIndex)
proc subscribeRequests*(
market: OnChainMarket, callback: OnRequest
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in Request subscription", msg = eventErr.msg
@ -342,7 +382,7 @@ proc subscribeRequests*(
proc subscribeSlotFilled*(
market: OnChainMarket, callback: OnSlotFilled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotFilled subscription", msg = eventErr.msg
@ -359,7 +399,7 @@ proc subscribeSlotFilled*(
requestId: RequestId,
slotIndex: uint64,
callback: OnSlotFilled,
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) =
if eventRequestId == requestId and eventSlotIndex == slotIndex:
callback(requestId, slotIndex)
@ -369,7 +409,7 @@ proc subscribeSlotFilled*(
proc subscribeSlotFreed*(
market: OnChainMarket, callback: OnSlotFreed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotFreed subscription", msg = eventErr.msg
@ -383,7 +423,7 @@ proc subscribeSlotFreed*(
proc subscribeSlotReservationsFull*(
market: OnChainMarket, callback: OnSlotReservationsFull
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotReservationsFull subscription",
@ -398,7 +438,7 @@ proc subscribeSlotReservationsFull*(
proc subscribeFulfillment(
market: OnChainMarket, callback: OnFulfillment
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -412,7 +452,7 @@ proc subscribeFulfillment(
proc subscribeFulfillment(
market: OnChainMarket, requestId: RequestId, callback: OnFulfillment
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -427,7 +467,7 @@ proc subscribeFulfillment(
proc subscribeRequestCancelled*(
market: OnChainMarket, callback: OnRequestCancelled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -441,7 +481,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestCancelled*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -456,7 +496,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestFailed*(
market: OnChainMarket, callback: OnRequestFailed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -470,7 +510,7 @@ proc subscribeRequestFailed*(
proc subscribeRequestFailed*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -485,7 +525,7 @@ proc subscribeRequestFailed*(
proc subscribeProofSubmission*(
market: OnChainMarket, callback: OnProofSubmitted
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in ProofSubmitted subscription", msg = eventErr.msg
@ -497,18 +537,23 @@ proc subscribeProofSubmission*(
let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)
proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
await subscription.eventSubscription.unsubscribe()
proc unsubscribe*(
subscription: OnChainMarketSubscription
) {.async: (raises: [CancelledError, MarketError]).} =
try:
await subscription.eventSubscription.unsubscribe()
except ProviderError as err:
raiseMarketError(err.msg)
proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromBlock: BlockTag
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest)
proc queryPastSlotFilledEvents*(
market: OnChainMarket, blocksAgo: int
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -516,21 +561,21 @@ proc queryPastSlotFilledEvents*(
proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromTime: int64
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
proc queryPastStorageRequestedEvents*(
market: OnChainMarket, fromBlock: BlockTag
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
return
await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest)
proc queryPastStorageRequestedEvents*(
market: OnChainMarket, blocksAgo: int
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -538,7 +583,7 @@ proc queryPastStorageRequestedEvents*(
proc queryPastStorageRequestedEventsFromTime*(
market: OnChainMarket, fromTime: int64
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)

View File

@ -1,6 +1,6 @@
import pkg/stint
import pkg/contractabi
import pkg/ethers/fields
import pkg/ethers/contracts/fields
type
Groth16Proof* = object

View File

@ -1,2 +1,2 @@
These are copied from nim-codex v0.2.0.
There are plans to extract/refactor the contract interoperability code from nim-codex into its own submodule. But this isn't prioritized atm. So we're copying it here until that's been handled.
These are copied from nim-codex v0.2.0.
There are plans to extract/refactor the contract interoperability code from nim-codex into its own submodule. But this isn't prioritized atm. So we're copying it here until that's been handled.

View File

@ -3,7 +3,7 @@ import std/sequtils
import std/typetraits
import pkg/contractabi
import pkg/nimcrypto
import pkg/ethers/fields
import pkg/ethers/contracts/fields
import pkg/results
import pkg/questionable/results
import pkg/stew/byteutils

View File

@ -33,10 +33,12 @@ type
config*: Config
events*: Events
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
proc delayedWorkerStart(
s: State, step: OnStep, delay: Duration
) {.async: (raises: [CancelledError]).} =
await sleepAsync(1.seconds)
proc worker(): Future[void] {.async.} =
proc worker(): Future[void] {.async: (raises: [CancelledError]).} =
while s.status == ApplicationStatus.Running:
if err =? (await step()).errorOption:
error "Failure-result caught in main loop. Stopping...", err = err.msg
@ -45,7 +47,9 @@ proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
asyncSpawn worker()
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
method whileRunning*(
s: State, step: OnStep, delay: Duration
) {.async: (raises: []), base.} =
# We use a small delay before starting the workers because 'whileRunning' is likely called from
# component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling).
# Worker steps might start raising events that other components haven't had time to subscribe to yet.

View File

@ -15,7 +15,8 @@ type
queue: AsyncEventQueue[?T]
subscriptions: seq[AsyncDataEventSubscription]
AsyncDataEventHandler*[T] = proc(data: T): Future[?!void]
AsyncDataEventHandler*[T] =
proc(data: T): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
AsyncDataEvent[T](
@ -24,7 +25,7 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
proc performUnsubscribe[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
) {.async: (raises: [CancelledError]).} =
if subscription in event.subscriptions:
await subscription.listenFuture.cancelAndWait()
event.subscriptions.delete(event.subscriptions.find(subscription))
@ -40,15 +41,18 @@ proc subscribe*[T](
delayedUnsubscribe: false,
)
proc listener() {.async.} =
while true:
let items = await event.queue.waitEvents(subscription.key)
for item in items:
if data =? item:
subscription.inHandler = true
subscription.lastResult = (await handler(data))
subscription.inHandler = false
subscription.fireEvent.fire()
proc listener() {.async: (raises: [CancelledError]).} =
try:
while true:
let items = await event.queue.waitEvents(subscription.key)
for item in items:
if data =? item:
subscription.inHandler = true
subscription.lastResult = (await handler(data))
subscription.inHandler = false
subscription.fireEvent.fire()
except AsyncEventQueueFullError as err:
raiseAssert("AsyncEventQueueFullError in asyncdataevent.listener()")
subscription.listenFuture = listener()
@ -81,13 +85,15 @@ proc fire*[T](
proc unsubscribe*[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
) {.async: (raises: [CancelledError]).} =
if subscription.inHandler:
subscription.delayedUnsubscribe = true
else:
await event.performUnsubscribe(subscription)
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} =
proc unsubscribeAll*[T](
event: AsyncDataEvent[T]
) {.async: (raises: [CancelledError]).} =
let all = event.subscriptions
for subscription in all:
await event.unsubscribe(subscription)

View File

@ -1,36 +1,36 @@
# Variables
ARG BUILDER=codexstorage/nim-lang:2.0.14
ARG IMAGE=ubuntu:24.04
ARG BUILD_HOME=/src
ARG MAKE_PARALLEL=${MAKE_PARALLEL:-4}
ARG NIMFLAGS="${NIMFLAGS:-"-d:disableMarchNative"}"
ARG USE_LIBBACKTRACE=${USE_LIBBACKTRACE:-1}
ARG APP_HOME=/crawler
# Build
FROM ${BUILDER} AS builder
ARG BUILD_HOME
ARG MAKE_PARALLEL
ARG NIMFLAGS
ARG USE_LIBBACKTRACE
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y cmake build-essential
WORKDIR ${BUILD_HOME}
COPY . .
RUN nimble install nimble
RUN nimble build
# Create
FROM ${IMAGE}
ARG BUILD_HOME
ARG APP_HOME
WORKDIR ${APP_HOME}
COPY --from=builder ${BUILD_HOME}/build/* /usr/local/bin
COPY --from=builder --chmod=0755 ${BUILD_HOME}/docker/docker-entrypoint.sh /
RUN apt-get update && apt-get install -y libgomp1 curl jq && rm -rf /var/lib/apt/lists/*
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["codexcrawler"]
# Variables
ARG BUILDER=ubuntu:24.04
ARG IMAGE=${BUILDER}
ARG BUILD_HOME=/src
ARG MAKE_PARALLEL=${MAKE_PARALLEL:-4}
ARG NIMFLAGS="${NIMFLAGS:-"-d:disableMarchNative"}"
ARG USE_LIBBACKTRACE=${USE_LIBBACKTRACE:-1}
ARG APP_HOME=/crawler
# Build
FROM ${BUILDER} AS builder
ARG BUILD_HOME
ARG MAKE_PARALLEL
ARG NIMFLAGS
ARG USE_LIBBACKTRACE
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y git cmake curl make bash build-essential
WORKDIR ${BUILD_HOME}
COPY . .
RUN make -j ${MAKE_PARALLEL} update
RUN make -j ${MAKE_PARALLEL}
# Create
FROM ${IMAGE}
ARG BUILD_HOME
ARG APP_HOME
WORKDIR ${APP_HOME}
COPY --from=builder ${BUILD_HOME}/build/* /usr/local/bin
COPY --from=builder --chmod=0755 ${BUILD_HOME}/docker/docker-entrypoint.sh /
RUN apt-get update && apt-get install -y libgomp1 curl jq && rm -rf /var/lib/apt/lists/*
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["codexcrawler"]

View File

@ -1,20 +1,20 @@
services:
crawler1:
build:
context: ..
dockerfile: ./docker/crawler.Dockerfile
environment:
- CRAWLER_LOGLEVEL=TRACE
# - CRAWLER_PUBLICIP= Set to override CURL to ip.codex.storage
- CRAWLER_METRICSADDRESS=0.0.0.0
- CRAWLER_METRICSPORT=8008
- CRAWLER_DATADIR=crawler_data
- CRAWLER_DISCPORT=8090
- CRAWLER_BOOTNODES=testnet_sprs
- CRAWLER_STEPDELAY=3000
- CRAWLER_REVISITDELAY=1440
ports:
- 8008:8008/tcp # Metrics
- 8090:8090/udp # DHT discovery
volumes:
- ./crawler_data:/crawler_data:z
services:
crawler1:
build:
context: ..
dockerfile: ./docker/crawler.Dockerfile
environment:
- CRAWLER_LOGLEVEL=TRACE
# - CRAWLER_PUBLICIP= Set to override CURL to ip.codex.storage
- CRAWLER_METRICSADDRESS=0.0.0.0
- CRAWLER_METRICSPORT=8008
- CRAWLER_DATADIR=crawler_data
- CRAWLER_DISCPORT=8090
- CRAWLER_BOOTNODES=testnet_sprs
- CRAWLER_STEPDELAY=3000
- CRAWLER_REVISITDELAY=1440
ports:
- 8008:8008/tcp # Metrics
- 8090:8090/udp # DHT discovery
volumes:
- ./crawler_data:/crawler_data:z

7
env.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# We use ${BASH_SOURCE[0]} instead of $0 to allow sourcing this file
# and we fall back to a Zsh-specific special var to also support Zsh.
REL_PATH="$(dirname ${BASH_SOURCE[0]:-${(%):-%x}})"
ABS_PATH="$(cd ${REL_PATH}; pwd)"
source ${ABS_PATH}/vendor/nimbus-build-system/scripts/env.sh

View File

@ -34,8 +34,11 @@ suite "ChainMetrics":
teardown:
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.steppers[0]()).tryGet()
proc onStep() {.async: (raises: []).} =
try:
(await state.steppers[0]()).tryGet()
except CatchableError:
raiseAssert("CatchableError in onStep")
test "start should start stepper for config.requestCheckDelay minutes":
check:

View File

@ -35,8 +35,11 @@ suite "DhtCrawler":
teardown:
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.steppers[0]()).tryGet()
proc onStep() {.async: (raises: []).} =
try:
(await state.steppers[0]()).tryGet()
except CatchableError:
raiseAssert("CatchableError in onStep")
proc responsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: true, nodeIds: @[nid])
@ -68,7 +71,9 @@ suite "DhtCrawler":
test "nodes returned by getNeighbors are raised as nodesFound":
var nodesFound = newSeq[Nid]()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNodesFound(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
nodesFound = nids
return success()
@ -86,7 +91,9 @@ suite "DhtCrawler":
test "responsive result from getNeighbors raises the node as successful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
proc onCheck(
event: DhtNodeCheckEventData
): Future[?!void] {.async: (raises: [CancelledError]).} =
checkEvent = event
return success()
@ -105,7 +112,9 @@ suite "DhtCrawler":
test "unresponsive result from getNeighbors raises the node as unsuccessful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
proc onCheck(
event: DhtNodeCheckEventData
): Future[?!void] {.async: (raises: [CancelledError]).} =
checkEvent = event
return success()

View File

@ -35,13 +35,18 @@ suite "DhtMetrics":
(await dhtmetrics.stop()).tryGet()
state.checkAllUnsubscribed()
proc fireDhtNodeCheckEvent(isOk: bool) {.async.} =
proc fireDhtNodeCheckEvent(isOk: bool) {.async: (raises: []).} =
let event = DhtNodeCheckEventData(id: nid, isOk: isOk)
try:
(await state.events.dhtNodeCheck.fire(event)).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireDhtNodeCheckEvent")
(await state.events.dhtNodeCheck.fire(event)).tryGet()
proc fireNodesDeletedEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesDeleted.fire(nids)).tryGet()
proc fireNodesDeletedEvent(nids: seq[Nid]) {.async: (raises: []).} =
try:
(await state.events.nodesDeleted.fire(nids)).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireNodesDeletedEvent")
test "dhtmetrics start should load both lists":
check:

View File

@ -38,11 +38,17 @@ suite "Nodestore":
state.checkAllUnsubscribed()
removeDir(dsPath)
proc fireNodeFoundEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesFound.fire(nids)).tryGet()
proc fireNodeFoundEvent(nids: seq[Nid]) {.async: (raises: []).} =
try:
(await state.events.nodesFound.fire(nids)).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireNodeFoundEvent")
proc fireCheckEvent(nid: Nid, isOk: bool) {.async.} =
(await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: isOk))).tryGet()
proc fireCheckEvent(nid: Nid, isOk: bool) {.async: (raises: []).} =
try:
(await state.events.dhtNodeCheck.fire(DhtNodeCheckEventData(id: nid, isOk: isOk))).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireCheckEvent")
test "nodeEntry encoding":
let entry =
@ -73,7 +79,9 @@ suite "Nodestore":
test "nodesFound event should fire newNodesDiscovered":
var newNodes = newSeq[Nid]()
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNewNodes(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
newNodes = nids
return success()
@ -97,7 +105,9 @@ suite "Nodestore":
var
newNodes = newSeq[Nid]()
count = 0
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNewNodes(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
newNodes = nids
inc count
return success()
@ -175,7 +185,9 @@ suite "Nodestore":
test "deleteEntries fires nodesDeleted event":
var deletedNodes = newSeq[Nid]()
proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} =
proc onDeleted(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
deletedNodes = nids
return success()
@ -245,7 +257,9 @@ suite "Nodestore":
test "dhtNodeCheck event for non-existing node should fire nodesDeleted":
var deletedNodes = newSeq[Nid]()
proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} =
proc onDeleted(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
deletedNodes = nids
return success()

View File

@ -37,7 +37,9 @@ suite "TimeTracker":
# Subscribe to nodesToRevisit event
nodesToRevisitReceived = newSeq[Nid]()
proc onToRevisit(nids: seq[Nid]): Future[?!void] {.async.} =
proc onToRevisit(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
nodesToRevisitReceived = nids
return success()
@ -53,11 +55,17 @@ suite "TimeTracker":
await state.events.nodesToRevisit.unsubscribe(sub)
state.checkAllUnsubscribed()
proc onStepCheck() {.async.} =
(await state.steppers[0]()).tryGet()
proc onStepCheck() {.async: (raises: []).} =
try:
(await state.steppers[0]()).tryGet()
except CatchableError:
raiseAssert("CatchableError in onStepCheck")
proc onStepRt() {.async.} =
(await state.steppers[1]()).tryGet()
proc onStepRt() {.async: (raises: []).} =
try:
(await state.steppers[1]()).tryGet()
except CatchableError:
raiseAssert("CatchableError in onStepRt")
proc createNodeInStore(lastVisit: uint64, firstInactive = 0.uint64): Nid =
let entry =
@ -120,7 +128,9 @@ suite "TimeTracker":
test "onStep raises routingTable nodes as nodesFound":
var nodesFound = newSeq[Nid]()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNodesFound(
nids: seq[Nid]
): Future[?!void] {.async: (raises: [CancelledError]).} =
nodesFound = nids
return success()

View File

@ -29,11 +29,17 @@ suite "TodoList":
(await todo.stop()).tryGet()
state.checkAllUnsubscribed()
proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} =
(await state.events.newNodesDiscovered.fire(nids)).tryGet()
proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async: (raises: []).} =
try:
(await state.events.newNodesDiscovered.fire(nids)).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireNewNodesDiscoveredEvent")
proc fireNodesToRevisitEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesToRevisit.fire(nids)).tryGet()
proc fireNodesToRevisitEvent(nids: seq[Nid]) {.async: (raises: []).} =
try:
(await state.events.nodesToRevisit.fire(nids)).tryGet()
except CatchableError:
raiseAssert("CatchableError in fireNodesToRevisitEvent")
test "discovered nodes are added to todo list":
await fireNewNodesDiscoveredEvent(@[nid])

View File

@ -12,17 +12,21 @@ type MockList* = ref object of List
removeSuccess*: bool
length*: int
method load*(this: MockList): Future[?!void] {.async.} =
method load*(this: MockList): Future[?!void] {.async: (raises: [CancelledError]).} =
this.loadCalled = true
return success()
method add*(this: MockList, nid: Nid): Future[?!void] {.async.} =
method add*(
this: MockList, nid: Nid
): Future[?!void] {.async: (raises: [CancelledError]).} =
this.added.add(nid)
if this.addSuccess:
return success()
return failure("test failure")
method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} =
method remove*(
this: MockList, nid: Nid
): Future[?!void] {.async: (raises: [CancelledError]).} =
this.removed.add(nid)
if this.removeSuccess:
return success()

View File

@ -15,7 +15,9 @@ proc checkAllUnsubscribed*(s: MockState) =
s.events.dhtNodeCheck.listeners == 0
s.events.nodesToRevisit.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
method whileRunning*(
s: MockState, step: OnStep, delay: Duration
) {.async: (raises: []).} =
s.steppers.add(step)
s.delays.add(delay)

View File

@ -20,7 +20,9 @@ suite "AsyncDataEvent":
test "Successful event":
var data = ""
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data = e.s
success()
@ -34,7 +36,9 @@ suite "AsyncDataEvent":
test "Multiple events":
var counter = 0
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
inc counter
success()
@ -54,15 +58,21 @@ suite "AsyncDataEvent":
data1 = ""
data2 = ""
data3 = ""
proc eventHandler1(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler1(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data1 = e.s
success()
proc eventHandler2(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler2(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data2 = e.s
success()
proc eventHandler3(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler3(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data3 = e.s
success()
@ -82,7 +92,9 @@ suite "AsyncDataEvent":
await event.unsubscribe(sub3)
test "Failed event preserves error message":
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
failure(msg)
let s = event.subscribe(eventHandler)
@ -100,15 +112,21 @@ suite "AsyncDataEvent":
data2 = ""
data3 = ""
proc handler1(e: ExampleData): Future[?!void] {.async.} =
proc handler1(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data1 = e.s
success()
proc handler2(e: ExampleData): Future[?!void] {.async.} =
proc handler2(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data2 = e.s
success()
proc handler3(e: ExampleData): Future[?!void] {.async.} =
proc handler3(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
data3 = e.s
success()
@ -134,18 +152,20 @@ suite "AsyncDataEvent":
isOK(await event.fire(ExampleData(s: msg)))
test "Can unsubscribe in handler":
proc doNothing() {.async, closure.} =
proc doNothing() {.async: (raises: [CancelledError]), closure.} =
await sleepAsync(1.millis)
var callback = doNothing
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
proc eventHandler(
e: ExampleData
): Future[?!void] {.async: (raises: [CancelledError]).} =
await callback()
success()
let s = event.subscribe(eventHandler)
proc doUnsubscribe() {.async.} =
proc doUnsubscribe() {.async: (raises: [CancelledError]).} =
await event.unsubscribe(s)
callback = doUnsubscribe

View File

@ -11,4 +11,4 @@ requires "asynctest >= 0.5.2 & < 0.6.0"
requires "unittest2 <= 0.3.0"
task test, "Run tests":
exec "nim c -r test.nim"
exec "nim c -r testCodexCrawler.nim"

1
vendor/asynctest vendored Submodule

@ -0,0 +1 @@
Subproject commit 572c897a4e1177e905105a3bafe8c5573d9bae83

1
vendor/codex-contracts-eth vendored Submodule

@ -0,0 +1 @@
Subproject commit 0bf138512b7c1c3b8d77c48376e47f702e47106c

1
vendor/constantine vendored Submodule

@ -0,0 +1 @@
Subproject commit 8d6a6a38b90fb8ee3ec2230839773e69aab36d80

1
vendor/lrucache.nim vendored Submodule

@ -0,0 +1 @@
Subproject commit 8767ade0b76ea5b5d4ce24a52d0c58a6ebeb66cd

1
vendor/nim-bearssl vendored Submodule

@ -0,0 +1 @@
Subproject commit 667b40440a53a58e9f922e29e20818720c62d9ac

1
vendor/nim-blscurve vendored Submodule

@ -0,0 +1 @@
Subproject commit de2d3c79264bba18dbea469c8c5c4b3bb3c8bc55

1
vendor/nim-chronicles vendored Submodule

@ -0,0 +1 @@
Subproject commit 81a4a7a360c78be9c80c8f735c76b6d4a1517304

1
vendor/nim-chronos vendored Submodule

@ -0,0 +1 @@
Subproject commit c04576d829b8a0a1b12baaa8bc92037501b3a4a0

1
vendor/nim-codex-dht vendored Submodule

@ -0,0 +1 @@
Subproject commit f6eef1ac95c70053b2518f1e3909c909ed8701a6

1
vendor/nim-confutils vendored Submodule

@ -0,0 +1 @@
Subproject commit cb858a27f4347be949d10ed74b58713d687936d2

1
vendor/nim-contract-abi vendored Submodule

@ -0,0 +1 @@
Subproject commit 842f48910be4f388bcbf8abf1f02aba1d5e2ee64

1
vendor/nim-datastore vendored Submodule

@ -0,0 +1 @@
Subproject commit 5778e373fa97286f389e0aef61f1e8f30a934dab

1
vendor/nim-docopt vendored Submodule

@ -0,0 +1 @@
Subproject commit efaa112b6df172a9168c4eb581ab8dda1fbcfe2a

1
vendor/nim-eth vendored Submodule

@ -0,0 +1 @@
Subproject commit dcfbc4291d39b59563828c3e32be4d51a2f25931

1
vendor/nim-ethers vendored Submodule

@ -0,0 +1 @@
Subproject commit bbced4673316763c6ef931b4d0a08069cde2474c

1
vendor/nim-faststreams vendored Submodule

@ -0,0 +1 @@
Subproject commit cf8d4d22636b8e514caf17e49f9c786ac56b0e85

1
vendor/nim-http-utils vendored Submodule

@ -0,0 +1 @@
Subproject commit 8bb1acbaa4b86eb866145b0d468eff64a57d1897

1
vendor/nim-json-rpc vendored Submodule

@ -0,0 +1 @@
Subproject commit 274372132de497e6b7b793c9d5d5474b71bf80a2

1
vendor/nim-json-serialization vendored Submodule

@ -0,0 +1 @@
Subproject commit 6eadb6e939ffa7882ff5437033c11a9464d3385c

1
vendor/nim-leveldbstatic vendored Submodule

@ -0,0 +1 @@
Subproject commit 378ef63e261e3b5834a3567404edc3ce838498b3

1
vendor/nim-libbacktrace vendored Submodule

@ -0,0 +1 @@
Subproject commit 6da0cda88ab7780bd5fd342327adb91ab84692aa

1
vendor/nim-libp2p vendored Submodule

@ -0,0 +1 @@
Subproject commit c08d80734989b028b3d1705f2188d783a343aac0

1
vendor/nim-metrics vendored Submodule

@ -0,0 +1 @@
Subproject commit cacfdc12454a0804c65112b9f4f50d1375208dcd

1
vendor/nim-nat-traversal vendored Submodule

@ -0,0 +1 @@
Subproject commit 6508ce75060878dfcdfa21f94721672c69a1823b

1
vendor/nim-ngtcp2 vendored Submodule

@ -0,0 +1 @@
Subproject commit 6834f4756b6af58356ac9c4fef3d71db3c3ae5fe

1
vendor/nim-poseidon2 vendored Submodule

@ -0,0 +1 @@
Subproject commit 4e2c6e619b2f2859aaa4b2aed2f346ea4d0c67a3

1
vendor/nim-protobuf-serialization vendored Submodule

@ -0,0 +1 @@
Subproject commit 5a31137a82c2b6a989c9ed979bb636c7a49f570e

1
vendor/nim-quic vendored Submodule

@ -0,0 +1 @@
Subproject commit ddcb31ffb74b5460ab37fd13547eca90594248bc

1
vendor/nim-regex vendored Submodule

@ -0,0 +1 @@
Subproject commit 88f634b95651ce0d53961668be1c414341429b7f

1
vendor/nim-results vendored Submodule

@ -0,0 +1 @@
Subproject commit df8113dda4c2d74d460a8fa98252b0b771bf1f27

1
vendor/nim-secp256k1 vendored Submodule

@ -0,0 +1 @@
Subproject commit 2acbbdcc0e63002a013fff49f015708522875832

1
vendor/nim-serde vendored Submodule

@ -0,0 +1 @@
Subproject commit 5ced7c88b97d99c582285ce796957fb71fd42434

1
vendor/nim-serialization vendored Submodule

@ -0,0 +1 @@
Subproject commit 2086c99608b4bf472e1ef5fe063710f280243396

1
vendor/nim-sqlite3-abi vendored Submodule

@ -0,0 +1 @@
Subproject commit 05bbff1af4e8fe2d972ba4b0667b89ca94d3ebba

1
vendor/nim-stew vendored Submodule

@ -0,0 +1 @@
Subproject commit a6e198132097fb544d04959aeb3b839e1408f942

1
vendor/nim-taskpools vendored Submodule

@ -0,0 +1 @@
Subproject commit 66585e2e960b7695e48ea60377fb3aeac96406e8

1
vendor/nim-testutils vendored Submodule

@ -0,0 +1 @@
Subproject commit 4d37244f9f5e1acd8592a4ceb5c3fc47bc160181

1
vendor/nim-unicodedb vendored Submodule

@ -0,0 +1 @@
Subproject commit 66f2458710dc641dd4640368f9483c8a0ec70561

1
vendor/nim-unittest2 vendored Submodule

@ -0,0 +1 @@
Subproject commit 845b6af28b9f68f02d320e03ad18eccccea7ddb9

1
vendor/nim-websock vendored Submodule

@ -0,0 +1 @@
Subproject commit ebe308a79a7b440a11dfbe74f352be86a3883508

1
vendor/nim-zlib vendored Submodule

@ -0,0 +1 @@
Subproject commit 91cf360b1aeb2e0c753ff8bac6de22a41c5ed8cd

1
vendor/nimbus-build-system vendored Submodule

@ -0,0 +1 @@
Subproject commit 0be0663e1af76e869837226a4ef3e586fcc737d3

1
vendor/nimcrypto vendored Submodule

@ -0,0 +1 @@
Subproject commit dc07e3058c6904eef965394493b6ea99aa2adefc

1
vendor/nph vendored Submodule

@ -0,0 +1 @@
Subproject commit f1f047760c6cb38d5c55d0ddb29b57a9c008a976

1
vendor/questionable vendored Submodule

@ -0,0 +1 @@
Subproject commit 47692e0d923ada8f7f731275b2a87614c0150987

1
vendor/stint vendored Submodule

@ -0,0 +1 @@
Subproject commit 5c5e01cef089a261474b7abfe246b37447aaa8ed

1
vendor/upraises vendored Submodule

@ -0,0 +1 @@
Subproject commit bc2628989b63854d980e92dadbd58f83e34b6f25