diff --git a/.github/workflows/windows-build.yml b/.github/workflows/windows-build.yml index 0582d5fd1..52cd7f91a 100644 --- a/.github/workflows/windows-build.yml +++ b/.github/workflows/windows-build.yml @@ -81,9 +81,13 @@ jobs: make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1 cd ../../../../ - - name: Building wakunode2 + - name: Building wakunode2.exe run: | make wakunode2 LOG_LEVEL=DEBUG V=3 -j8 + + - name: Building libwaku.dll + run: | + make libwaku STATIC=0 LOG_LEVEL=DEBUG V=1 -j - name: Check Executable run: | @@ -93,3 +97,9 @@ jobs: echo "Build failed: wakunode2.exe not found" exit 1 fi + if [ -f "./build/libwaku.dll" ]; then + echo "libwaku.dll build successful" + else + echo "Build failed: libwaku.dll not found" + exit 1 + fi diff --git a/Makefile b/Makefile index ae57852a5..555a20472 100644 --- a/Makefile +++ b/Makefile @@ -112,11 +112,8 @@ ifeq (, $(shell which cargo)) curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable endif -anvil: rustup -ifeq (, $(shell which anvil 2> /dev/null)) -# Install Anvil if it's not installed - ./scripts/install_anvil.sh -endif +rln-deps: rustup + ./scripts/install_rln_tests_dependencies.sh deps: | deps-common nat-libs waku.nims @@ -205,8 +202,8 @@ testcommon: | build deps ########## .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester -# install anvil only for the testwaku target -testwaku: | build deps anvil librln +# install rln-deps only for the testwaku target +testwaku: | build deps rln-deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims @@ -401,14 +398,16 @@ docker-liteprotocoltester-push: STATIC ?= 0 + libwaku: | build deps librln - rm -f build/libwaku* + rm -f build/libwaku* + ifeq ($(STATIC), 1) - echo -e $(BUILD_MSG) "build/$@.a" && \ - $(ENV_SCRIPT) nim libwakuStatic $(NIM_PARAMS) waku.nims + echo -e $(BUILD_MSG) "build/$@.a" && $(ENV_SCRIPT) nim libwakuStatic $(NIM_PARAMS) waku.nims +else ifeq ($(detected_OS),Windows) + echo -e $(BUILD_MSG) "build/$@.dll" && $(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims else - echo -e $(BUILD_MSG) "build/$@.so" && \ - $(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims + echo -e $(BUILD_MSG) "build/$@.so" && $(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims endif ##################### diff --git a/README.md b/README.md index 057d0b622..119c00052 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ pacman -S --noconfirm --needed mingw-w64-x86_64-python #### 3. Build Wakunode - Open Git Bash as administrator - clone nwaku and cd nwaku -- Execute: `./scripts/build_wakunode_windows.sh` +- Execute: `./scripts/build_windows.sh` #### 4. Troubleshooting If `wakunode2.exe` isn't generated: diff --git a/apps/sonda/docker-compose.yml b/apps/sonda/docker-compose.yml index c6235ef32..d6594428e 100644 --- a/apps/sonda/docker-compose.yml +++ b/apps/sonda/docker-compose.yml @@ -9,7 +9,7 @@ x-logging: &logging x-rln-relay-eth-client-address: &rln_relay_eth_client_address ${RLN_RELAY_ETH_CLIENT_ADDRESS:-} # Add your RLN_RELAY_ETH_CLIENT_ADDRESS after the "-" x-rln-environment: &rln_env - RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8} + RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xB9cd878C90E49F797B4431fBF4fb333108CB90e6} RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-" RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-" diff --git a/apps/sonda/register_rln.sh b/apps/sonda/register_rln.sh index aca1007a8..4fb373b3a 100755 --- a/apps/sonda/register_rln.sh +++ b/apps/sonda/register_rln.sh @@ -24,7 +24,7 @@ fi docker run -v $(pwd)/keystore:/keystore/:Z harbor.status.im/wakuorg/nwaku:v0.30.1 generateRlnKeystore \ --rln-relay-eth-client-address=${RLN_RELAY_ETH_CLIENT_ADDRESS} \ --rln-relay-eth-private-key=${ETH_TESTNET_KEY} \ ---rln-relay-eth-contract-address=0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8 \ +--rln-relay-eth-contract-address=0xB9cd878C90E49F797B4431fBF4fb333108CB90e6 \ --rln-relay-cred-path=/keystore/keystore.json \ --rln-relay-cred-password="${RLN_RELAY_CRED_PASSWORD}" \ --rln-relay-user-message-limit=20 \ diff --git a/library/events/json_waku_not_responding_event.nim b/library/events/json_waku_not_responding_event.nim new file mode 100644 index 000000000..1e1d5fcc5 --- /dev/null +++ b/library/events/json_waku_not_responding_event.nim @@ -0,0 +1,9 @@ +import system, std/json, ./json_base_event + +type JsonWakuNotRespondingEvent* = ref object of JsonEvent + +proc new*(T: type JsonWakuNotRespondingEvent): T = + return JsonWakuNotRespondingEvent(eventType: "waku_not_responding") + +method `$`*(event: JsonWakuNotRespondingEvent): string = + $(%*event) diff --git a/library/libwaku.h b/library/libwaku.h index 525fec69a..b5d6c9bab 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -45,6 +45,8 @@ int waku_version(void* ctx, WakuCallBack callback, void* userData); +// Sets a callback that will be invoked whenever an event occurs. +// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe. void waku_set_event_callback(void* ctx, WakuCallBack callback, void* userData); diff --git a/library/libwaku.nim b/library/libwaku.nim index 3e4431411..bc1614af8 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -15,8 +15,7 @@ import waku/waku_core/topics/pubsub_topic, waku/waku_core/subscription/push_handler, waku/waku_relay, - ./events/ - [json_message_event, json_topic_health_change_event, json_connection_change_event], + ./events/json_message_event, ./waku_thread/waku_thread, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, @@ -48,25 +47,6 @@ template checkLibwakuParams*( if isNil(callback): return RET_MISSING_CALLBACK -template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) = - if isNil(ctx[].eventCallback): - error eventName & " - eventCallback is nil" - return - - foreignThreadGc: - try: - let event = body - cast[WakuCallBack](ctx[].eventCallback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData - ) - except Exception, CatchableError: - let msg = - "Exception " & eventName & " when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[WakuCallBack](ctx[].eventCallback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData - ) - proc handleRequest( ctx: ptr WakuContext, requestType: RequestType, @@ -81,21 +61,6 @@ proc handleRequest( return RET_OK -proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler = - return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = - callEventCallback(ctx, "onConnectionChange"): - $JsonConnectionChangeEvent.new($peerId, peerEvent) - -proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = - return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = - callEventCallback(ctx, "onReceivedMessage"): - $JsonMessageEvent.new(pubsubTopic, msg) - -proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler = - return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = - callEventCallback(ctx, "onTopicHealthChange"): - $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - ### End of not-exported components ################################################################################ @@ -146,8 +111,8 @@ proc waku_new( return nil ## Create the Waku thread that will keep waiting for req from the main thread. - var ctx = waku_thread.createWakuThread().valueOr: - let msg = "Error in createWakuThread: " & $error + var ctx = waku_thread.createWakuContext().valueOr: + let msg = "Error in createWakuContext: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil @@ -180,7 +145,7 @@ proc waku_destroy( initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread.destroyWakuThread(ctx).isOkOr: + waku_thread.destroyWakuContext(ctx).isOkOr: let msg = "libwaku error: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR diff --git a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim index 4ab8914ee..0bd9235b6 100644 --- a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim @@ -18,6 +18,7 @@ type DebugNodeMsgType* = enum RETRIEVE_MY_PEER_ID RETRIEVE_METRICS RETRIEVE_ONLINE_STATE + CHECK_WAKU_NOT_BLOCKED type DebugNodeRequest* = object operation: DebugNodeMsgType @@ -55,6 +56,8 @@ proc process*( return ok(getMetrics()) of RETRIEVE_ONLINE_STATE: return ok($waku.healthMonitor.onlineMonitor.amIOnline()) + of CHECK_WAKU_NOT_BLOCKED: + return ok("waku thread is not blocked") error "unsupported operation in DebugNodeRequest" return err("unsupported operation in DebugNodeRequest") diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 640389e32..37c37e6df 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -4,10 +4,22 @@ import std/[options, atomics, os, net, locks] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types +import + waku/factory/waku, + waku/node/peer_manager, + waku/waku_relay/[protocol, topic_health], + waku/waku_core/[topics/pubsub_topic, message], + ./inter_thread_communication/[waku_thread_request, requests/debug_node_request], + ../ffi_types, + ../events/[ + json_message_event, json_topic_health_change_event, json_connection_change_event, + json_waku_not_responding_event, + ] type WakuContext* = object - thread: Thread[(ptr WakuContext)] + wakuThread: Thread[(ptr WakuContext)] + watchdogThread: Thread[(ptr WakuContext)] + # monitors the Waku thread and notifies the Waku SDK consumer if it hangs lock: Lock reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr @@ -17,78 +29,48 @@ type WakuContext* = object userData*: pointer eventCallback*: pointer eventUserdata*: pointer - running: Atomic[bool] # To control when the thread is running + running: Atomic[bool] # To control when the threads are running const git_version* {.strdefine.} = "n/a" const versionString = "version / git commit hash: " & waku.git_version -proc runWaku(ctx: ptr WakuContext) {.async.} = - ## This is the worker body. This runs the Waku node - ## and attends library user requests (stop, connect_to, etc.) +template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return - var waku: Waku + foreignThreadGc: + try: + let event = body + cast[WakuCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) - while true: - await ctx.reqSignal.wait() +proc onConnectionChange*(ctx: ptr WakuContext): ConnectionChangeHandler = + return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = + callEventCallback(ctx, "onConnectionChange"): + $JsonConnectionChangeEvent.new($peerId, peerEvent) - if ctx.running.load == false: - break +proc onReceivedMessage*(ctx: ptr WakuContext): WakuRelayHandler = + return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = + callEventCallback(ctx, "onReceivedMessage"): + $JsonMessageEvent.new(pubsubTopic, msg) - ## Trying to get a request from the libwaku requestor thread - var request: ptr WakuThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) - if not recvOk: - error "waku thread could not receive a request" - continue +proc onTopicHealthChange*(ctx: ptr WakuContext): TopicHealthChangeHandler = + return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = + callEventCallback(ctx, "onTopicHealthChange"): + $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - ## Handle the request - asyncSpawn WakuThreadRequest.process(request, addr waku) - -proc run(ctx: ptr WakuContext) {.thread.} = - ## Launch waku worker - waitFor runWaku(ctx) - -proc createWakuThread*(): Result[ptr WakuContext, string] = - ## This proc is called from the main thread and it creates - ## the Waku working thread. - var ctx = createShared(WakuContext, 1) - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr") - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr") - ctx.lock.initLock() - - ctx.running.store(true) - - try: - createThread(ctx.thread, run, ctx) - except ValueError, ResourceExhaustedError: - # and freeShared for typed allocations! - freeShared(ctx) - - return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) - - return ok(ctx) - -proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = - ctx.running.store(false) - - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - return err("error in destroyWakuThread: " & $error) - if not signaledOnTime: - return err("failed to signal reqSignal on time in destroyWakuThread") - - joinThread(ctx.thread) - ctx.lock.deinitLock() - ?ctx.reqSignal.close() - ?ctx.reqReceivedSignal.close() - freeShared(ctx) - - return ok() +proc onWakuNotResponding*(ctx: ptr WakuContext) = + callEventCallback(ctx, "onWakuNotResponsive"): + $JsonWakuNotRespondingEvent.new() proc sendRequestToWakuThread*( ctx: ptr WakuContext, @@ -96,16 +78,17 @@ proc sendRequestToWakuThread*( reqContent: pointer, callback: WakuCallBack, userData: pointer, + timeout = InfiniteDuration, ): Result[void, string] = - let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) - + ctx.lock.acquire() # This lock is only necessary while we use a SP Channel and while the signalling # between threads assumes that there aren't concurrent requests. # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive # requests concurrently and spare us the need of locks - ctx.lock.acquire() defer: ctx.lock.release() + + let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: @@ -122,11 +105,115 @@ proc sendRequestToWakuThread*( return err("Couldn't fireSync in time") ## wait until the Waku Thread properly received the request - let res = ctx.reqReceivedSignal.waitSync() + let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): deallocShared(req) return err("Couldn't receive reqReceivedSignal signal") ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the - ## process proc. + ## process proc. See the 'waku_thread_request.nim' module for more details. ok() + +proc watchdogThreadBody(ctx: ptr WakuContext) {.thread.} = + ## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs. + + let watchdogRun = proc(ctx: ptr WakuContext) {.async.} = + const WatchdogTimeinterval = 1.seconds + const WakuNotRespondingTimeout = 3.seconds + while true: + await sleepAsync(WatchdogTimeinterval) + + if ctx.running.load == false: + debug "Watchdog thread exiting because WakuContext is not running" + break + + let wakuCallback = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer + ) {.cdecl, gcsafe, raises: [].} = + discard ## Don't do anything. Just respecting the callback signature. + const nilUserData = nil + + trace "Sending watchdog request to Waku thread" + + sendRequestToWakuThread( + ctx, + RequestType.DEBUG, + DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), + wakuCallback, + nilUserData, + WakuNotRespondingTimeout, + ).isOkOr: + error "Failed to send watchdog request to Waku thread", error = $error + onWakuNotResponding(ctx) + + waitFor watchdogRun(ctx) + +proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} = + ## Waku thread that attends library user requests (stop, connect_to, etc.) + + let wakuRun = proc(ctx: ptr WakuContext) {.async.} = + var waku: Waku + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Trying to get a request from the libwaku requestor thread + var request: ptr WakuThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "waku thread could not receive a request" + continue + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Handle the request + asyncSpawn WakuThreadRequest.process(request, addr waku) + + waitFor wakuRun(ctx) + +proc createWakuContext*(): Result[ptr WakuContext, string] = + ## This proc is called from the main thread and it creates + ## the Waku working thread. + var ctx = createShared(WakuContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.wakuThread, wakuThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) + + try: + createThread(ctx.watchdogThread, watchdogThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroyWakuContext*(ctx: ptr WakuContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroyWakuContext: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroyWakuContext") + + joinThread(ctx.wakuThread) + joinThread(ctx.watchdogThread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() diff --git a/scripts/build_wakunode_windows.sh b/scripts/build_windows.sh similarity index 88% rename from scripts/build_wakunode_windows.sh rename to scripts/build_windows.sh index ef0881836..e56fb8871 100755 --- a/scripts/build_wakunode_windows.sh +++ b/scripts/build_windows.sh @@ -36,25 +36,28 @@ cd ../../../.. echo "6. -.-.-.- Building libunwind -.-.-.-" cd vendor/nim-libbacktrace -execute_command "make all V=1" -execute_command "make install/usr/lib/libunwind.a V=1" +execute_command "make all V=1 -j8" +execute_command "make install/usr/lib/libunwind.a V=1 -j8" cp ./vendor/libunwind/build/lib/libunwind.a install/usr/lib cd ../../ echo "7. -.-.-.- Building miniupnpc -.-.-.- " cd vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc execute_command "git checkout little_chore_windows_support" -execute_command "make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1" +execute_command "make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1 -j8" cd ../../../../.. echo "8. -.-.-.- Building libnatpmp -.-.-.- " cd ./vendor/nim-nat-traversal/vendor/libnatpmp-upstream -make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1 +make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1 -j8 cd ../../../../ echo "9. -.-.-.- Building wakunode2 -.-.-.- " execute_command "make wakunode2 LOG_LEVEL=DEBUG V=1 -j8" +echo "10. -.-.-.- Building libwaku -.-.-.- " +execute_command "make libwaku STATIC=0 LOG_LEVEL=DEBUG V=1 -j8" + echo "Windows setup completed successfully!" echo "✓ Successful commands: $success_count" echo "✗ Failed commands: $failure_count" diff --git a/scripts/install_anvil.sh b/scripts/install_anvil.sh index 13d5f8dfd..1bf4bd7b1 100755 --- a/scripts/install_anvil.sh +++ b/scripts/install_anvil.sh @@ -2,13 +2,14 @@ # Install Anvil +if ! command -v anvil &> /dev/null; then + BASE_DIR="${XDG_CONFIG_HOME:-$HOME}" + FOUNDRY_DIR="${FOUNDRY_DIR:-"$BASE_DIR/.foundry"}" + FOUNDRY_BIN_DIR="$FOUNDRY_DIR/bin" -BASE_DIR="${XDG_CONFIG_HOME:-$HOME}" -FOUNDRY_DIR="${FOUNDRY_DIR-"$BASE_DIR/.foundry"}" -FOUNDRY_BIN_DIR="$FOUNDRY_DIR/bin" - -curl -L https://foundry.paradigm.xyz | bash -# Extract the source path from the download result -echo "foundryup_path: $FOUNDRY_BIN_DIR" -# run foundryup -$FOUNDRY_BIN_DIR/foundryup \ No newline at end of file + curl -L https://foundry.paradigm.xyz | bash + # Extract the source path from the download result + echo "foundryup_path: $FOUNDRY_BIN_DIR" + # run foundryup + $FOUNDRY_BIN_DIR/foundryup +fi \ No newline at end of file diff --git a/scripts/install_pnpm.sh b/scripts/install_pnpm.sh new file mode 100755 index 000000000..34ba47b07 --- /dev/null +++ b/scripts/install_pnpm.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +# Install pnpm +if ! command -v pnpm &> /dev/null; then + echo "pnpm is not installed, installing it now..." + npm i pnpm --global +fi + diff --git a/scripts/install_rln_tests_dependencies.sh b/scripts/install_rln_tests_dependencies.sh new file mode 100755 index 000000000..e19e0ef3c --- /dev/null +++ b/scripts/install_rln_tests_dependencies.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +# Install Anvil +./scripts/install_anvil.sh + +#Install pnpm +./scripts/install_pnpm.sh \ No newline at end of file diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 07e0cd895..2723fac8f 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -108,4 +108,4 @@ import import ./waku_rln_relay/test_all # Node Factory -import ./factory/test_external_config +import ./factory/[test_external_config, test_node_factory, test_waku_conf] diff --git a/tests/factory/test_waku_conf.nim b/tests/factory/test_waku_conf.nim index 6b7040dd5..c18a2c73c 100644 --- a/tests/factory/test_waku_conf.nim +++ b/tests/factory/test_waku_conf.nim @@ -9,7 +9,7 @@ import testutils/unittests import waku/factory/waku_conf, - waku/factory/waku_conf_builder, + waku/factory/conf_builder/conf_builder, waku/factory/networks_config, waku/common/utils/parse_size_units @@ -24,7 +24,7 @@ suite "Waku Conf - build with cluster conf": let expectedShards = toSeq[0.uint16 .. 7.uint16] ## Given - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) builder.withClusterConf(clusterConf) builder.withRelay(true) builder.rlnRelayConf.withTreePath("/tmp/test-tree-path") @@ -65,7 +65,7 @@ suite "Waku Conf - build with cluster conf": let expectedShards = toSeq[0.uint16 .. 7.uint16] ## Given - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) builder.withClusterConf(clusterConf) builder.withRelay(false) @@ -95,7 +95,7 @@ suite "Waku Conf - build with cluster conf": expectedShards = toSeq[0.uint16 .. 7.uint16] ## Given - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) builder.withClusterConf(clusterConf) builder.rlnRelayConf.withEnabled(false) @@ -122,7 +122,7 @@ suite "Waku Conf - build with cluster conf": let shards = @[2.uint16, 3.uint16] ## Given - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) builder.withClusterConf(clusterConf) builder.withShards(shards) @@ -148,7 +148,7 @@ suite "Waku Conf - build with cluster conf": let shards = @[2.uint16, 10.uint16] ## Given - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) builder.withClusterConf(clusterConf) builder.withShards(shards) @@ -158,11 +158,11 @@ suite "Waku Conf - build with cluster conf": ## Then assert resConf.isErr(), "Invalid shard was accepted" - test "Cluster Conf is passed and RLN contract is overridden": + test "Cluster Conf is passed and RLN contract is **not** overridden": ## Setup let clusterConf = ClusterConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() - builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/") + builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) # Mount all shards in network let expectedShards = toSeq[0.uint16 .. 7.uint16] @@ -194,7 +194,8 @@ suite "Waku Conf - build with cluster conf": assert conf.rlnRelayConf.isSome let rlnRelayConf = conf.rlnRelayConf.get() - check rlnRelayConf.ethContractAddress.string == contractAddress + check rlnRelayConf.ethContractAddress.string == + clusterConf.rlnRelayEthContractAddress check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic check rlnRelayConf.chainId == clusterConf.rlnRelayChainId check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index b19d15030..3de48a738 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -3,7 +3,7 @@ {.push raises: [].} import - std/[options, sequtils, deques, random], + std/[options, sequtils, deques, random, locks], results, stew/byteutils, testutils/unittests, @@ -28,58 +28,71 @@ import ../testlib/wakucore, ./utils_onchain +var testLock: Lock +initLock(testLock) + suite "Onchain group manager": - # We run Anvil - let runAnvil {.used.} = runAnvil() + setup: + # Acquire lock to ensure tests run sequentially + acquire(testLock) - var manager {.threadvar.}: OnchainGroupManager + let runAnvil {.used.} = runAnvil() - asyncSetup: - manager = await setupOnchainGroupManager() + var manager {.threadvar.}: OnchainGroupManager + manager = waitFor setupOnchainGroupManager() - asyncTeardown: - await manager.stop() + teardown: + waitFor manager.stop() + stopAnvil(runAnvil) + # Release lock after test completes + release(testLock) - asyncTest "should initialize successfully": - (await manager.init()).isOkOr: + test "should initialize successfully": + (waitFor manager.init()).isOkOr: raiseAssert $error check: manager.ethRpc.isSome() manager.wakuRlnContract.isSome() manager.initialized - manager.rlnRelayMaxMessageLimit == 100 + manager.rlnRelayMaxMessageLimit == 600 - asyncTest "should error on initialization when chainId does not match": + test "should error on initialization when chainId does not match": manager.chainId = utils_onchain.CHAIN_ID + 1 - (await manager.init()).isErrOr: + (waitFor manager.init()).isErrOr: raiseAssert "Expected error when chainId does not match" - asyncTest "should initialize when chainId is set to 0": + test "should initialize when chainId is set to 0": manager.chainId = 0x0'u256 - - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error - asyncTest "should error on initialization when loaded metadata does not match": - (await manager.init()).isOkOr: + test "should error on initialization when loaded metadata does not match": + (waitFor manager.init()).isOkOr: assert false, $error - let metadataSetRes = manager.setMetadata() assert metadataSetRes.isOk(), metadataSetRes.error let metadataOpt = manager.rlnInstance.getMetadata().valueOr: assert false, $error return - assert metadataOpt.isSome(), "metadata is not set" let metadata = metadataOpt.get() - - assert metadata.chainId == 1337, "chainId is not equal to 1337" + assert metadata.chainId == 1234, "chainId is not equal to 1234" assert metadata.contractAddress == manager.ethContractAddress, "contractAddress is not equal to " & manager.ethContractAddress - - let differentContractAddress = await uploadRLNContract(manager.ethClientUrls[0]) + let web3 = manager.ethRpc.get() + let accounts = waitFor web3.provider.eth_accounts() + web3.defaultAccount = accounts[2] + let (privateKey, acc) = createEthAccount(web3) + let tokenAddress = (waitFor deployTestToken(privateKey, acc, web3)).valueOr: + assert false, "Failed to deploy test token contract: " & $error + return + let differentContractAddress = ( + waitFor executeForgeContractDeployScripts(privateKey, acc, web3) + ).valueOr: + assert false, "Failed to deploy RLN contract: " & $error + return # simulating a change in the contractAddress let manager2 = OnchainGroupManager( ethClientUrls: @[EthClient], @@ -89,52 +102,47 @@ suite "Onchain group manager": assert false, errStr , ) - let e = await manager2.init() + let e = waitFor manager2.init() (e).isErrOr: assert false, "Expected error when contract address doesn't match" - asyncTest "should error if contract does not exist": + test "should error if contract does not exist": manager.ethContractAddress = "0x0000000000000000000000000000000000000000" - var triggeredError = false - try: - discard await manager.init() - except CatchableError: - triggeredError = true + (waitFor manager.init()).isErrOr: + raiseAssert "Expected error when contract address doesn't exist" - check triggeredError - - asyncTest "should error when keystore path and password are provided but file doesn't exist": + test "should error when keystore path and password are provided but file doesn't exist": manager.keystorePath = some("/inexistent/file") manager.keystorePassword = some("password") - (await manager.init()).isErrOr: + (waitFor manager.init()).isErrOr: raiseAssert "Expected error when keystore file doesn't exist" - asyncTest "trackRootChanges: start tracking roots": - (await manager.init()).isOkOr: + test "trackRootChanges: start tracking roots": + (waitFor manager.init()).isOkOr: raiseAssert $error discard manager.trackRootChanges() - asyncTest "trackRootChanges: should guard against uninitialized state": + test "trackRootChanges: should guard against uninitialized state": try: discard manager.trackRootChanges() except CatchableError: check getCurrentExceptionMsg().len == 38 - asyncTest "trackRootChanges: should sync to the state of the group": + test "trackRootChanges: should sync to the state of the group": let credentials = generateCredentials(manager.rlnInstance) - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error let merkleRootBefore = manager.fetchMerkleRoot() try: - await manager.register(credentials, UserMessageLimit(1)) + waitFor manager.register(credentials, UserMessageLimit(20)) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - discard await withTimeout(trackRootChanges(manager), 15.seconds) + discard waitFor withTimeout(trackRootChanges(manager), 15.seconds) let merkleRootAfter = manager.fetchMerkleRoot() @@ -151,7 +159,7 @@ suite "Onchain group manager": metadata.validRoots == manager.validRoots.toSeq() merkleRootBefore != merkleRootAfter - asyncTest "trackRootChanges: should fetch history correctly": + test "trackRootChanges: should fetch history correctly": # TODO: We can't use `trackRootChanges()` directly in this test because its current implementation # relies on a busy loop rather than event-based monitoring. As a result, some root changes # may be missed, leading to inconsistent test results (i.e., it may randomly return true or false). @@ -159,15 +167,16 @@ suite "Onchain group manager": # after each registration. const credentialCount = 6 let credentials = generateCredentials(manager.rlnInstance, credentialCount) - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error let merkleRootBefore = manager.fetchMerkleRoot() try: for i in 0 ..< credentials.len(): - await manager.register(credentials[i], UserMessageLimit(1)) - discard await manager.updateRoots() + debug "Registering credential", index = i, credential = credentials[i] + waitFor manager.register(credentials[i], UserMessageLimit(20)) + discard waitFor manager.updateRoots() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() @@ -177,13 +186,13 @@ suite "Onchain group manager": merkleRootBefore != merkleRootAfter manager.validRoots.len() == credentialCount - asyncTest "register: should guard against uninitialized state": + test "register: should guard against uninitialized state": let dummyCommitment = default(IDCommitment) try: - await manager.register( + waitFor manager.register( RateCommitment( - idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(1) + idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20) ) ) except CatchableError: @@ -191,18 +200,18 @@ suite "Onchain group manager": except Exception: assert false, "exception raised: " & getCurrentExceptionMsg() - asyncTest "register: should register successfully": + test "register: should register successfully": # TODO :- similar to ```trackRootChanges: should fetch history correctly``` - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error let idCommitment = generateCredentials(manager.rlnInstance).idCommitment let merkleRootBefore = manager.fetchMerkleRoot() try: - await manager.register( + waitFor manager.register( RateCommitment( - idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1) + idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20) ) ) except Exception, CatchableError: @@ -215,47 +224,47 @@ suite "Onchain group manager": merkleRootAfter != merkleRootBefore manager.latestIndex == 1 - asyncTest "register: callback is called": + test "register: callback is called": let idCredentials = generateCredentials(manager.rlnInstance) let idCommitment = idCredentials.idCommitment let fut = newFuture[void]() proc callback(registrations: seq[Membership]): Future[void] {.async.} = - let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1)).get() + let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(20)).get() check: registrations.len == 1 registrations[0].rateCommitment == rateCommitment registrations[0].index == 0 fut.complete() - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error manager.onRegister(callback) try: - await manager.register( + waitFor manager.register( RateCommitment( - idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1) + idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20) ) ) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await fut + waitFor fut - asyncTest "withdraw: should guard against uninitialized state": + test "withdraw: should guard against uninitialized state": let idSecretHash = generateCredentials(manager.rlnInstance).idSecretHash try: - await manager.withdraw(idSecretHash) + waitFor manager.withdraw(idSecretHash) except CatchableError: assert true except Exception: assert false, "exception raised: " & getCurrentExceptionMsg() - asyncTest "validateRoot: should validate good root": + test "validateRoot: should validate good root": let idCredentials = generateCredentials(manager.rlnInstance) let idCommitment = idCredentials.idCommitment @@ -264,27 +273,27 @@ suite "Onchain group manager": proc callback(registrations: seq[Membership]): Future[void] {.async.} = if registrations.len == 1 and registrations[0].rateCommitment == - getRateCommitment(idCredentials, UserMessageLimit(1)).get() and + getRateCommitment(idCredentials, UserMessageLimit(20)).get() and registrations[0].index == 0: manager.idCredentials = some(idCredentials) fut.complete() manager.onRegister(callback) - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error try: - await manager.register(idCredentials, UserMessageLimit(1)) + waitFor manager.register(idCredentials, UserMessageLimit(20)) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await fut + waitFor fut - let rootUpdated = await manager.updateRoots() + let rootUpdated = waitFor manager.updateRoots() if rootUpdated: - let proofResult = await manager.fetchMerkleProofElements() + let proofResult = waitFor manager.fetchMerkleProofElements() if proofResult.isErr(): error "Failed to fetch Merkle proof", error = proofResult.error manager.merkleProofCache = proofResult.get() @@ -306,14 +315,14 @@ suite "Onchain group manager": check: validated - asyncTest "validateRoot: should reject bad root": + test "validateRoot: should reject bad root": let idCredentials = generateCredentials(manager.rlnInstance) let idCommitment = idCredentials.idCommitment - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error - manager.userMessageLimit = some(UserMessageLimit(1)) + manager.userMessageLimit = some(UserMessageLimit(20)) manager.membershipIndex = some(MembershipIndex(0)) manager.idCredentials = some(idCredentials) @@ -339,9 +348,9 @@ suite "Onchain group manager": check: validated == false - asyncTest "verifyProof: should verify valid proof": + test "verifyProof: should verify valid proof": let credentials = generateCredentials(manager.rlnInstance) - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error let fut = newFuture[void]() @@ -349,7 +358,7 @@ suite "Onchain group manager": proc callback(registrations: seq[Membership]): Future[void] {.async.} = if registrations.len == 1 and registrations[0].rateCommitment == - getRateCommitment(credentials, UserMessageLimit(1)).get() and + getRateCommitment(credentials, UserMessageLimit(20)).get() and registrations[0].index == 0: manager.idCredentials = some(credentials) fut.complete() @@ -357,15 +366,15 @@ suite "Onchain group manager": manager.onRegister(callback) try: - await manager.register(credentials, UserMessageLimit(1)) + waitFor manager.register(credentials, UserMessageLimit(20)) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await fut + waitFor fut - let rootUpdated = await manager.updateRoots() + let rootUpdated = waitFor manager.updateRoots() if rootUpdated: - let proofResult = await manager.fetchMerkleProofElements() + let proofResult = waitFor manager.fetchMerkleProofElements() if proofResult.isErr(): error "Failed to fetch Merkle proof", error = proofResult.error manager.merkleProofCache = proofResult.get() @@ -388,21 +397,21 @@ suite "Onchain group manager": check: verified - asyncTest "verifyProof: should reject invalid proof": - (await manager.init()).isOkOr: + test "verifyProof: should reject invalid proof": + (waitFor manager.init()).isOkOr: raiseAssert $error let idCredential = generateCredentials(manager.rlnInstance) try: - await manager.register(idCredential, UserMessageLimit(1)) + waitFor manager.register(idCredential, UserMessageLimit(20)) except Exception, CatchableError: assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() let messageBytes = "Hello".toBytes() - let rootUpdated = await manager.updateRoots() + let rootUpdated = waitFor manager.updateRoots() manager.merkleProofCache = newSeq[byte](640) for i in 0 ..< 640: @@ -427,10 +436,10 @@ suite "Onchain group manager": check: verified == false - asyncTest "root queue should be updated correctly": + test "root queue should be updated correctly": const credentialCount = 12 let credentials = generateCredentials(manager.rlnInstance, credentialCount) - (await manager.init()).isOkOr: + (waitFor manager.init()).isOkOr: raiseAssert $error type TestBackfillFuts = array[0 .. credentialCount - 1, Future[void]] @@ -445,7 +454,7 @@ suite "Onchain group manager": proc callback(registrations: seq[Membership]): Future[void] {.async.} = if registrations.len == 1 and registrations[0].rateCommitment == - getRateCommitment(credentials[futureIndex], UserMessageLimit(1)).get() and + getRateCommitment(credentials[futureIndex], UserMessageLimit(20)).get() and registrations[0].index == MembershipIndex(futureIndex): futs[futureIndex].complete() futureIndex += 1 @@ -456,47 +465,40 @@ suite "Onchain group manager": manager.onRegister(generateCallback(futures, credentials)) for i in 0 ..< credentials.len(): - await manager.register(credentials[i], UserMessageLimit(1)) - discard await manager.updateRoots() + waitFor manager.register(credentials[i], UserMessageLimit(20)) + discard waitFor manager.updateRoots() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - await allFutures(futures) + waitFor allFutures(futures) check: manager.validRoots.len() == credentialCount - asyncTest "isReady should return false if ethRpc is none": - (await manager.init()).isOkOr: + test "isReady should return false if ethRpc is none": + (waitFor manager.init()).isOkOr: raiseAssert $error manager.ethRpc = none(Web3) var isReady = true try: - isReady = await manager.isReady() + isReady = waitFor manager.isReady() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() check: isReady == false - asyncTest "isReady should return true if ethRpc is ready": - (await manager.init()).isOkOr: + test "isReady should return true if ethRpc is ready": + (waitFor manager.init()).isOkOr: raiseAssert $error var isReady = false try: - isReady = await manager.isReady() + isReady = waitFor manager.isReady() except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() check: isReady == true - - ################################ - ## Terminating/removing Anvil - ################################ - - # We stop Anvil daemon - stopAnvil(runAnvil) diff --git a/tests/waku_rln_relay/utils_onchain.nim b/tests/waku_rln_relay/utils_onchain.nim index 0c7fcce26..9066b0292 100644 --- a/tests/waku_rln_relay/utils_onchain.nim +++ b/tests/waku_rln_relay/utils_onchain.nim @@ -30,7 +30,7 @@ import ../testlib/common, ./utils -const CHAIN_ID* = 1337'u256 +const CHAIN_ID* = 1234'u256 template skip0xPrefix(hexStr: string): int = ## Returns the index of the first meaningful char in `hexStr` by skipping @@ -61,64 +61,347 @@ proc generateCredentials*(rlnInstance: ptr RLN, n: int): seq[IdentityCredential] credentials.add(generateCredentials(rlnInstance)) return credentials -# a util function used for testing purposes -# it deploys membership contract on Anvil (or any Eth client available on EthClient address) -# must be edited if used for a different contract than membership contract -# -proc uploadRLNContract*(ethClientAddress: string): Future[Address] {.async.} = - let web3 = await newWeb3(ethClientAddress) - debug "web3 connected to", ethClientAddress +proc getContractAddressFromDeployScriptOutput(output: string): Result[string, string] = + const searchStr = "Return ==\n0: address " + const addressLength = 42 # Length of an Ethereum address in hex format + let idx = output.find(searchStr) + if idx >= 0: + let startPos = idx + searchStr.len + let endPos = output.find('\n', startPos) + if (endPos - startPos) >= addressLength: + let address = output[startPos ..< endPos] + return ok(address) + return err("Unable to find contract address in deploy script output") - # fetch the list of registered accounts - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[1] - let add = web3.defaultAccount - debug "contract deployer account address ", add +proc getForgePath(): string = + var forgePath = "" + if existsEnv("XDG_CONFIG_HOME"): + forgePath = joinPath(forgePath, os.getEnv("XDG_CONFIG_HOME", "")) + else: + forgePath = joinPath(forgePath, os.getEnv("HOME", "")) + forgePath = joinPath(forgePath, ".foundry/bin/forge") + return $forgePath - let balance = - await web3.provider.eth_getBalance(web3.defaultAccount, blockId("latest")) - debug "Initial account balance: ", balance +contract(ERC20Token): + proc allowance(owner: Address, spender: Address): UInt256 {.view.} + proc balanceOf(account: Address): UInt256 {.view.} - # deploy poseidon hasher bytecode - let poseidonT3Receipt = await web3.deployContract(PoseidonT3) - let poseidonT3Address = poseidonT3Receipt.contractAddress.get() - let poseidonAddressStripped = strip0xPrefix($poseidonT3Address) +proc getTokenBalance( + web3: Web3, tokenAddress: Address, account: Address +): Future[UInt256] {.async.} = + let token = web3.contractSender(ERC20Token, tokenAddress) + return await token.balanceOf(account).call() - # deploy lazy imt bytecode - let lazyImtReceipt = await web3.deployContract( - LazyIMT.replace("__$PoseidonT3$__", poseidonAddressStripped) - ) - let lazyImtAddress = lazyImtReceipt.contractAddress.get() - let lazyImtAddressStripped = strip0xPrefix($lazyImtAddress) +proc ethToWei(eth: UInt256): UInt256 = + eth * 1000000000000000000.u256 - # deploy waku rlnv2 contract - let wakuRlnContractReceipt = await web3.deployContract( - WakuRlnV2Contract.replace("__$PoseidonT3$__", poseidonAddressStripped).replace( - "__$LazyIMT$__", lazyImtAddressStripped +proc sendMintCall( + web3: Web3, + accountFrom: Address, + tokenAddress: Address, + recipientAddress: Address, + amountTokens: UInt256, + recipientBalanceBeforeExpectedTokens: Option[UInt256] = none(UInt256), +): Future[TxHash] {.async.} = + let doBalanceAssert = recipientBalanceBeforeExpectedTokens.isSome() + + if doBalanceAssert: + let balanceBeforeMint = await getTokenBalance(web3, tokenAddress, recipientAddress) + let balanceBeforeExpectedTokens = recipientBalanceBeforeExpectedTokens.get() + assert balanceBeforeMint == balanceBeforeExpectedTokens, + fmt"Balance is {balanceBeforeMint} before minting but expected {balanceBeforeExpectedTokens}" + + # Create mint transaction + # Method ID for mint(address,uint256) is 0x40c10f19 which is part of the openzeppelin ERC20 standard + # The method ID for a deployed test token can be viewed here https://sepolia.lineascan.build/address/0x185A0015aC462a0aECb81beCc0497b649a64B9ea#writeContract + let mintSelector = "0x40c10f19" + let addressHex = recipientAddress.toHex() + # Pad the address and amount to 32 bytes each + let paddedAddress = addressHex.align(64, '0') + + let amountHex = amountTokens.toHex() + let amountWithout0x = + if amountHex.toLower().startsWith("0x"): + amountHex[2 .. ^1] + else: + amountHex + let paddedAmount = amountWithout0x.align(64, '0') + let mintCallData = mintSelector & paddedAddress & paddedAmount + let gasPrice = int(await web3.provider.eth_gasPrice()) + + # Create the transaction + var tx: TransactionArgs + tx.`from` = Opt.some(accountFrom) + tx.to = Opt.some(tokenAddress) + tx.value = Opt.some(0.u256) # No ETH is sent for token operations + tx.gasPrice = Opt.some(Quantity(gasPrice)) + tx.data = Opt.some(byteutils.hexToSeqByte(mintCallData)) + + trace "Sending mint call" + let txHash = await web3.send(tx) + + let balanceOfSelector = "0x70a08231" + let balanceCallData = balanceOfSelector & paddedAddress + + # Wait a bit for transaction to be mined + await sleepAsync(500.milliseconds) + + if doBalanceAssert: + let balanceAfterMint = await getTokenBalance(web3, tokenAddress, recipientAddress) + let balanceAfterExpectedTokens = + recipientBalanceBeforeExpectedTokens.get() + amountTokens + assert balanceAfterMint == balanceAfterExpectedTokens, + fmt"Balance is {balanceAfterMint} after transfer but expected {balanceAfterExpectedTokens}" + + return txHash + +# Check how many tokens a spender (the RLN contract) is allowed to spend on behalf of the owner (account which wishes to register a membership) +proc checkTokenAllowance( + web3: Web3, tokenAddress: Address, owner: Address, spender: Address +): Future[UInt256] {.async.} = + let token = web3.contractSender(ERC20Token, tokenAddress) + let allowance = await token.allowance(owner, spender).call() + trace "Current allowance", owner = owner, spender = spender, allowance = allowance + return allowance + +proc setupContractDeployment( + forgePath: string, submodulePath: string +): Result[void, string] = + trace "Contract deployer paths", forgePath = forgePath, submodulePath = submodulePath + # Build the Foundry project + try: + let (forgeCleanOutput, forgeCleanExitCode) = + execCmdEx(fmt"""cd {submodulePath} && {forgePath} clean""") + trace "Executed forge clean command", output = forgeCleanOutput + if forgeCleanExitCode != 0: + return err("forge clean command failed") + + let (forgeInstallOutput, forgeInstallExitCode) = + execCmdEx(fmt"""cd {submodulePath} && {forgePath} install""") + trace "Executed forge install command", output = forgeInstallOutput + if forgeInstallExitCode != 0: + return err("forge install command failed") + + let (pnpmInstallOutput, pnpmInstallExitCode) = + execCmdEx(fmt"""cd {submodulePath} && pnpm install""") + trace "Executed pnpm install command", output = pnpmInstallOutput + if pnpmInstallExitCode != 0: + return err("pnpm install command failed" & pnpmInstallOutput) + + let (forgeBuildOutput, forgeBuildExitCode) = + execCmdEx(fmt"""cd {submodulePath} && {forgePath} build""") + trace "Executed forge build command", output = forgeBuildOutput + if forgeBuildExitCode != 0: + return err("forge build command failed") + + # Set the environment variable API keys to anything for local testnet deployment + putEnv("API_KEY_CARDONA", "123") + putEnv("API_KEY_LINEASCAN", "123") + putEnv("API_KEY_ETHERSCAN", "123") + except OSError, IOError: + return err("Command execution failed: " & getCurrentExceptionMsg()) + return ok() + +proc deployTestToken*( + pk: keys.PrivateKey, acc: Address, web3: Web3 +): Future[Result[Address, string]] {.async.} = + ## Executes a Foundry forge script that deploys the a token contract (ERC-20) used for testing. This is a prerequisite to enable the contract deployment and this token contract address needs to be minted and approved for the accounts that need to register memberships with the contract + ## submodulePath: path to the submodule containing contract deploy scripts + + # All RLN related tests should be run from the root directory of the project + let submodulePath = absolutePath("./vendor/waku-rlnv2-contract") + + # Verify submodule path exists + if not dirExists(submodulePath): + error "Submodule path does not exist", submodulePath = submodulePath + return err("Submodule path does not exist: " & submodulePath) + + let forgePath = getForgePath() + + setupContractDeployment(forgePath, submodulePath).isOkOr: + error "Failed to setup contract deployment", error = $error + return err("Failed to setup contract deployment: " & $error) + + # Deploy TestToken contract + let forgeCmdTestToken = + fmt"""cd {submodulePath} && {forgePath} script test/TestToken.sol --broadcast -vvv --rpc-url http://localhost:8540 --tc TestTokenFactory --private-key {pk} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json""" + let (outputDeployTestToken, exitCodeDeployTestToken) = execCmdEx(forgeCmdTestToken) + trace "Executed forge command to deploy TestToken contract", + output = outputDeployTestToken + if exitCodeDeployTestToken != 0: + return error("Forge command to deploy TestToken contract failed") + + # Parse the command output to find contract address + let testTokenAddress = getContractAddressFromDeployScriptOutput(outputDeployTestToken).valueOr: + error "Failed to get TestToken contract address from deploy script output", + error = $error + return err( + "Failed to get TestToken contract address from deploy script output: " & $error ) - ) - let wakuRlnContractAddress = wakuRlnContractReceipt.contractAddress.get() - let wakuRlnAddressStripped = strip0xPrefix($wakuRlnContractAddress) + debug "Address of the TestToken contract", testTokenAddress - debug "Address of the deployed rlnv2 contract: ", wakuRlnContractAddress + let testTokenAddressBytes = hexToByteArray[20](testTokenAddress) + let testTokenAddressAddress = Address(testTokenAddressBytes) + putEnv("TOKEN_ADDRESS", testTokenAddressAddress.toHex()) - # need to send concat: impl & init_bytes - let contractInput = - byteutils.toHex(encode(wakuRlnContractAddress)) & Erc1967ProxyContractInput - debug "contractInput", contractInput - let proxyReceipt = - await web3.deployContract(Erc1967Proxy, contractInput = contractInput) + return ok(testTokenAddressAddress) - debug "proxy receipt", contractAddress = proxyReceipt.contractAddress.get() - let proxyAddress = proxyReceipt.contractAddress.get() +# Sends an ERC20 token approval call to allow a spender to spend a certain amount of tokens on behalf of the owner +proc approveTokenAllowanceAndVerify*( + web3: Web3, + accountFrom: Address, + privateKey: keys.PrivateKey, + tokenAddress: Address, + spender: Address, + amountWei: UInt256, + expectedAllowanceBefore: Option[UInt256] = none(UInt256), +): Future[Result[TxHash, string]] {.async.} = + var allowanceBefore: UInt256 + if expectedAllowanceBefore.isSome(): + allowanceBefore = + await checkTokenAllowance(web3, tokenAddress, accountFrom, spender) + let expected = expectedAllowanceBefore.get() + if allowanceBefore != expected: + return + err(fmt"Allowance is {allowanceBefore} before approval but expected {expected}") - let newBalance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest") - debug "Account balance after the contract deployment: ", newBalance + # Temporarily set the private key + let oldPrivateKey = web3.privateKey + web3.privateKey = Opt.some(privateKey) + web3.lastKnownNonce = Opt.none(Quantity) + + try: + # ERC20 approve function signature: approve(address spender, uint256 amount) + # Method ID for approve(address,uint256) is 0x095ea7b3 + const APPROVE_SELECTOR = "0x095ea7b3" + let addressHex = spender.toHex().align(64, '0') + let amountHex = amountWei.toHex().align(64, '0') + let approveCallData = APPROVE_SELECTOR & addressHex & amountHex + + let gasPrice = await web3.provider.eth_gasPrice() + + var tx: TransactionArgs + tx.`from` = Opt.some(accountFrom) + tx.to = Opt.some(tokenAddress) + tx.value = Opt.some(0.u256) + tx.gasPrice = Opt.some(gasPrice) + tx.gas = Opt.some(Quantity(100000)) + tx.data = Opt.some(byteutils.hexToSeqByte(approveCallData)) + tx.chainId = Opt.some(CHAIN_ID) + + trace "Sending approve call", tx = tx + let txHash = await web3.send(tx) + let receipt = await web3.getMinedTransactionReceipt(txHash) + + if receipt.status.isNone(): + return err("Approval transaction failed receipt is none") + if receipt.status.get() != 1.Quantity: + return err("Approval transaction failed status quantity not 1") + + # Single verification check after mining (no extra sleep needed) + let allowanceAfter = + await checkTokenAllowance(web3, tokenAddress, accountFrom, spender) + let expectedAfter = + if expectedAllowanceBefore.isSome(): + expectedAllowanceBefore.get() + amountWei + else: + amountWei + + if allowanceAfter < expectedAfter: + return err( + fmt"Allowance is {allowanceAfter} after approval but expected at least {expectedAfter}" + ) + + return ok(txHash) + except CatchableError as e: + return err(fmt"Failed to send approve transaction: {e.msg}") + finally: + # Restore the old private key + web3.privateKey = oldPrivateKey + +proc executeForgeContractDeployScripts*( + privateKey: keys.PrivateKey, acc: Address, web3: Web3 +): Future[Result[Address, string]] {.async, gcsafe.} = + ## Executes a set of foundry forge scripts required to deploy the RLN contract and returns the deployed proxy contract address + ## submodulePath: path to the submodule containing contract deploy scripts + + # All RLN related tests should be run from the root directory of the project + let submodulePath = "./vendor/waku-rlnv2-contract" + + # Verify submodule path exists + if not dirExists(submodulePath): + error "Submodule path does not exist", submodulePath = submodulePath + return err("Submodule path does not exist: " & submodulePath) + + let forgePath = getForgePath() + debug "Forge path", forgePath + + # Verify forge executable exists + if not fileExists(forgePath): + error "Forge executable not found", forgePath = forgePath + return err("Forge executable not found: " & forgePath) + + trace "contract deployer account details", account = acc, privateKey = privateKey + let setupContractEnv = setupContractDeployment(forgePath, submodulePath) + if setupContractEnv.isErr(): + error "Failed to setup contract deployment" + return err("Failed to setup contract deployment") + + # Deploy LinearPriceCalculator contract + let forgeCmdPriceCalculator = + fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployPriceCalculator --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json""" + let (outputDeployPriceCalculator, exitCodeDeployPriceCalculator) = + execCmdEx(forgeCmdPriceCalculator) + trace "Executed forge command to deploy LinearPriceCalculator contract", + output = outputDeployPriceCalculator + if exitCodeDeployPriceCalculator != 0: + return error("Forge command to deploy LinearPriceCalculator contract failed") + + # Parse the output to find contract address + let priceCalculatorAddressRes = + getContractAddressFromDeployScriptOutput(outputDeployPriceCalculator) + if priceCalculatorAddressRes.isErr(): + error "Failed to get LinearPriceCalculator contract address from deploy script output" + let priceCalculatorAddress = priceCalculatorAddressRes.get() + debug "Address of the LinearPriceCalculator contract", priceCalculatorAddress + putEnv("PRICE_CALCULATOR_ADDRESS", priceCalculatorAddress) + + let forgeCmdWakuRln = + fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployWakuRlnV2 --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json""" + let (outputDeployWakuRln, exitCodeDeployWakuRln) = execCmdEx(forgeCmdWakuRln) + trace "Executed forge command to deploy WakuRlnV2 contract", + output = outputDeployWakuRln + if exitCodeDeployWakuRln != 0: + error "Forge command to deploy WakuRlnV2 contract failed", + output = outputDeployWakuRln + + # Parse the output to find contract address + let wakuRlnV2AddressRes = + getContractAddressFromDeployScriptOutput(outputDeployWakuRln) + if wakuRlnV2AddressRes.isErr(): + error "Failed to get WakuRlnV2 contract address from deploy script output" + ##TODO: raise exception here? + let wakuRlnV2Address = wakuRlnV2AddressRes.get() + debug "Address of the WakuRlnV2 contract", wakuRlnV2Address + putEnv("WAKURLNV2_ADDRESS", wakuRlnV2Address) + + # Deploy Proxy contract + let forgeCmdProxy = + fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployProxy --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json""" + let (outputDeployProxy, exitCodeDeployProxy) = execCmdEx(forgeCmdProxy) + trace "Executed forge command to deploy proxy contract", output = outputDeployProxy + if exitCodeDeployProxy != 0: + error "Forge command to deploy Proxy failed", error = outputDeployProxy + return err("Forge command to deploy Proxy failed") + + let proxyAddress = getContractAddressFromDeployScriptOutput(outputDeployProxy) + let proxyAddressBytes = hexToByteArray[20](proxyAddress.get()) + let proxyAddressAddress = Address(proxyAddressBytes) + + info "Address of the Proxy contract", proxyAddressAddress await web3.close() - debug "disconnected from ", ethClientAddress - - return proxyAddress + return ok(proxyAddressAddress) proc sendEthTransfer*( web3: Web3, @@ -133,7 +416,7 @@ proc sendEthTransfer*( let balanceBeforeWei = await web3.provider.eth_getBalance(accountTo, "latest") let balanceBeforeExpectedWei = accountToBalanceBeforeExpectedWei.get() assert balanceBeforeWei == balanceBeforeExpectedWei, - fmt"Balance is {balanceBeforeWei} but expected {balanceBeforeExpectedWei}" + fmt"Balance is {balanceBeforeWei} before transfer but expected {balanceBeforeExpectedWei}" let gasPrice = int(await web3.provider.eth_gasPrice()) @@ -146,17 +429,17 @@ proc sendEthTransfer*( # TODO: handle the error if sending fails let txHash = await web3.send(tx) + # Wait a bit for transaction to be mined + await sleepAsync(200.milliseconds) + if doBalanceAssert: let balanceAfterWei = await web3.provider.eth_getBalance(accountTo, "latest") let balanceAfterExpectedWei = accountToBalanceBeforeExpectedWei.get() + amountWei assert balanceAfterWei == balanceAfterExpectedWei, - fmt"Balance is {balanceAfterWei} but expected {balanceAfterExpectedWei}" + fmt"Balance is {balanceAfterWei} after transfer but expected {balanceAfterExpectedWei}" return txHash -proc ethToWei(eth: UInt256): UInt256 = - eth * 1000000000000000000.u256 - proc createEthAccount*( ethAmount: UInt256 = 1000.u256 ): Future[(keys.PrivateKey, Address)] {.async.} = @@ -198,7 +481,7 @@ proc getAnvilPath*(): string = return $anvilPath # Runs Anvil daemon -proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process = +proc runAnvil*(port: int = 8540, chainId: string = "1234"): Process = # Passed options are # --port Port to listen on. # --gas-limit Sets the block gas limit in WEI. @@ -212,13 +495,13 @@ proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process = anvilPath, args = [ "--port", - "8540", + $port, "--gas-limit", "300000000000000", "--balance", "1000000000", "--chain-id", - $CHAIN_ID, + $chainId, ], options = {poUsePath}, ) @@ -242,14 +525,26 @@ proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process = # Stops Anvil daemon proc stopAnvil*(runAnvil: Process) {.used.} = + if runAnvil.isNil: + debug "stopAnvil called with nil Process" + return + let anvilPID = runAnvil.processID - # We wait the daemon to exit + debug "Stopping Anvil daemon", anvilPID = anvilPID + try: - # We terminate Anvil daemon by sending a SIGTERM signal to the runAnvil PID to trigger RPC server termination and clean-up - kill(runAnvil) - debug "Sent SIGTERM to Anvil", anvilPID = anvilPID - except: - error "Anvil daemon termination failed: ", err = getCurrentExceptionMsg() + # Send termination signals + when not defined(windows): + discard execCmdEx(fmt"kill -TERM {anvilPID}") + discard execCmdEx(fmt"kill -9 {anvilPID}") + else: + discard execCmdEx(fmt"taskkill /F /PID {anvilPID}") + + # Close Process object to release resources + close(runAnvil) + debug "Anvil daemon stopped", anvilPID = anvilPID + except Exception as e: + debug "Error stopping Anvil daemon", anvilPID = anvilPID, error = e.msg proc setupOnchainGroupManager*( ethClientUrl: string = EthClient, amountEth: UInt256 = 10.u256 @@ -261,12 +556,10 @@ proc setupOnchainGroupManager*( let rlnInstance = rlnInstanceRes.get() - let contractAddress = await uploadRLNContract(ethClientUrl) # connect to the eth client let web3 = await newWeb3(ethClientUrl) - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[0] + web3.defaultAccount = accounts[1] let (privateKey, acc) = createEthAccount(web3) @@ -276,6 +569,32 @@ proc setupOnchainGroupManager*( web3, web3.defaultAccount, acc, ethToWei(1000.u256), some(0.u256) ) + let testTokenAddress = (await deployTestToken(privateKey, acc, web3)).valueOr: + assert false, "Failed to deploy test token contract: " & $error + return + + # mint the token from the generated account + discard await sendMintCall( + web3, web3.defaultAccount, testTokenAddress, acc, ethToWei(1000.u256), some(0.u256) + ) + + let contractAddress = (await executeForgeContractDeployScripts(privateKey, acc, web3)).valueOr: + assert false, "Failed to deploy RLN contract: " & $error + return + + # If the generated account wishes to register a membership, it needs to approve the contract to spend its tokens + let tokenApprovalResult = await approveTokenAllowanceAndVerify( + web3, + acc, # owner + privateKey, + testTokenAddress, # ERC20 token address + contractAddress, # spender - the proxy contract that will spend the tokens + ethToWei(200.u256), + some(0.u256), # expected allowance before approval + ) + + assert tokenApprovalResult.isOk, tokenApprovalResult.error() + let manager = OnchainGroupManager( ethClientUrls: @[ethClientUrl], ethContractAddress: $contractAddress, diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index e7fd82b57..d5cb601a2 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -26,6 +26,7 @@ proc newTestWakuRecon*( wantsTx: AsyncQueue[PeerId], needsTx: AsyncQueue[(PeerId, Fingerprint)], cluster: uint16 = 1, + syncRange: timer.Duration = DefaultSyncRange, shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7], ): Future[SyncReconciliation] {.async.} = let peerManager = PeerManager.new(switch) @@ -36,6 +37,7 @@ proc newTestWakuRecon*( peerManager = peerManager, wakuArchive = nil, relayJitter = 0.seconds, + syncRange = syncRange, idsRx = idsRx, localWantsTx = wantsTx, remoteNeedsTx = needsTx, diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index efdd6a885..c606934cf 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -1,8 +1,12 @@ {.used.} import - std/[options, sets, random, math], testutils/unittests, chronos, libp2p/crypto/crypto - + std/[options, sets, random, math, algorithm], + testutils/unittests, + chronos, + libp2p/crypto/crypto +import chronos, chronos/asyncsync +import nimcrypto import ../../waku/[ node/peer_manager, @@ -21,6 +25,15 @@ import ../waku_archive/archive_utils, ./sync_utils +proc collectDiffs*( + chan: var Channel[SyncID], diffCount: int +): HashSet[WakuMessageHash] = + var received: HashSet[WakuMessageHash] + while received.len < diffCount: + let sid = chan.recv() # synchronous receive + received.incl sid.hash + result = received + suite "Waku Sync: reconciliation": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch @@ -234,53 +247,376 @@ suite "Waku Sync: reconciliation": remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": - let msgCount = 10_000 - var diffCount = 1_000 + const + msgCount = 200_000 # total messages on the server + diffCount = 100 # messages initially missing on the client - var diffMsgHashes: HashSet[WakuMessageHash] - var randIndexes: HashSet[int] + ## ── choose which messages will be absent from the client ───────────── + var missingIdx: HashSet[int] + while missingIdx.len < diffCount: + missingIdx.incl rand(0 ..< msgCount) - # Diffs - for i in 0 ..< diffCount: - var randInt = rand(0 ..< msgCount) - - #make sure we actually have the right number of diffs - while randInt in randIndexes: - randInt = rand(0 ..< msgCount) - - randIndexes.incl(randInt) - - # sync window is 1 hour, spread msg equally in that time - let timeSlice = calculateTimeRange() - let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) - let (part, _) = divmod(timeWindow, 100_000) - - var timestamp = timeSlice.a + ## ── generate messages and pre-load the two reconcilers ─────────────── + let slice = calculateTimeRange() # 1-hour window + let step = (int64(slice.b) - int64(slice.a)) div msgCount + var ts = slice.a for i in 0 ..< msgCount: let - msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) - hash = computeMessageHash(DefaultPubsubTopic, msg) + msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + h = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(h, msg) # every msg is on the server + if i notin missingIdx: + client.messageIngress(h, msg) # all but 100 are on the client + ts += Timestamp(step) - if i in randIndexes: - diffMsgHashes.incl(hash) + ## ── sanity before we start the round ───────────────────────────────── + check remoteNeeds.len == 0 + + ## ── launch reconciliation from the client towards the server ───────── + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + ## ── verify that ≈100 diffs were queued (allow 10 % slack) ──────────── + check remoteNeeds.len >= 90 # ≈ 100 × 0.9 + + asyncTest "sync 2 nodes 400K msgs 100k diffs": + const + msgCount = 400_000 + diffCount = 100_000 + tol = 1000 + + var diffMsgHashes: HashSet[WakuMessageHash] + var missingIdx: HashSet[int] + while missingIdx.len < diffCount: + missingIdx.incl rand(0 ..< msgCount) + + let slice = calculateTimeRange() + let step = (int64(slice.b) - int64(slice.a)) div msgCount + var ts = slice.a + + for i in 0 ..< msgCount: + let + msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + h = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(h, msg) + if i notin missingIdx: + client.messageIngress(h, msg) else: - client.messageIngress(hash, msg) + diffMsgHashes.incl h - timestamp += Timestamp(part) - continue + ts += Timestamp(step) - check: - remoteNeeds.len == 0 + check remoteNeeds.len == 0 let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error - # timimg issue make it hard to match exact numbers - check: - remoteNeeds.len > 900 + check remoteNeeds.len >= diffCount - tol and remoteNeeds.len < diffCount + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes 100 msgs 20 diff – 1-second window": + const + msgCount = 100 + diffCount = 20 + + var missingIdx: seq[int] = @[] + while missingIdx.len < diffCount: + let n = rand(0 ..< msgCount) + if n notin missingIdx: + missingIdx.add n + + var diffMsgHashes: HashSet[WakuMessageHash] + + let sliceEnd = now() + let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64 + let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount + var ts = sliceStart + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + + if i in missingIdx: + diffMsgHashes.incl hash + else: + client.messageIngress(hash, msg) + + ts += Timestamp(step) + + check remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffCount + + for _ in 0 ..< diffCount: + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes 500k msgs 300k diff – stress window": + const + msgCount = 500_000 + diffCount = 300_000 + + randomize() + var allIdx = newSeq[int](msgCount) + for i in 0 ..< msgCount: + allIdx[i] = i + shuffle(allIdx) + + let missingIdx = allIdx[0 ..< diffCount] + var missingSet: HashSet[int] + for idx in missingIdx: + missingSet.incl idx + + var diffMsgHashes: HashSet[WakuMessageHash] + + let sliceEnd = now() + let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64 + let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount + var ts = sliceStart + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + + if i in missingSet: + diffMsgHashes.incl hash + else: + client.messageIngress(hash, msg) + + ts += Timestamp(step) + + check remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffCount + + for _ in 0 ..< 1000: + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes, 40 msgs: 20 in-window diff, 20 out-window ignored": + const + diffInWin = 20 + diffOutWin = 20 + stepOutNs = 100_000_000'u64 + outOffsetNs = 2_300_000_000'u64 # for 20 mesg they sent 2 seconds earlier + + randomize() + + let nowNs = getNowInNanosecondTime() + let sliceStart = Timestamp(uint64(nowNs) - 700_000_000'u64) + let sliceEnd = nowNs + let stepIn = (sliceEnd.int64 - sliceStart.int64) div diffInWin + + let oldStart = Timestamp(uint64(sliceStart) - outOffsetNs) + let stepOut = Timestamp(stepOutNs) + + var inWinHashes, outWinHashes: HashSet[WakuMessageHash] + + var ts = sliceStart + for _ in 0 ..< diffInWin: + let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + inWinHashes.incl hash + ts += Timestamp(stepIn) + + ts = oldStart + for _ in 0 ..< diffOutWin: + let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + outWinHashes.incl hash + ts += Timestamp(stepOut) + + check remoteNeeds.len == 0 + + let oneSec = timer.seconds(1) + + server = await newTestWakuRecon( + serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + ) + + client = await newTestWakuRecon( + clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + ) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffInWin + + for _ in 0 ..< diffInWin: + let (_, deliveredHashes) = await remoteNeeds.get() + check deliveredHashes in inWinHashes + check deliveredHashes notin outWinHashes + + asyncTest "hash-fingerprint collision, same timestamp – stable sort": + let ts = Timestamp(getNowInNanosecondTime()) + + var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + var msg2 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + msg2.payload[0] = msg2.payload[0] xor 0x01 + var h1 = computeMessageHash(DefaultPubsubTopic, msg1) + var h2 = computeMessageHash(DefaultPubsubTopic, msg2) + + for i in 0 ..< 8: + h2[i] = h1[i] + for i in 0 ..< 8: + check h1[i] == h2[i] + + check h1 != h2 + + server.messageIngress(h1, msg1) + client.messageIngress(h2, msg2) + + check remoteNeeds.len == 0 + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + + var vec = @[SyncID(time: ts, hash: h2), SyncID(time: ts, hash: h1)] + vec.shuffle() + vec.sort() + + let hFirst = vec[0].hash + let hSecond = vec[1].hash + check vec[0].time == ts and vec[1].time == ts + + asyncTest "malformed message-ID is ignored during reconciliation": + let nowTs = Timestamp(getNowInNanosecondTime()) + + let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic) + var goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg) + + var badHash: WakuMessageHash + for i in 0 ..< 32: + badHash[i] = 0'u8 + let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic) + + server.messageIngress(goodHash, goodMsg) + server.messageIngress(badHash, badMsg) + + check remoteNeeds.len == 0 + + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == goodHash + check neededHash != badHash + + asyncTest "malformed ID: future-timestamp msg is ignored": + let nowNs = getNowInNanosecondTime() + let tsNow = Timestamp(nowNs) + + let goodMsg = fakeWakuMessage(ts = tsNow, contentTopic = DefaultContentTopic) + let goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg) + + const tenYearsSec = 10 * 365 * 24 * 60 * 60 + let futureNs = nowNs + int64(tenYearsSec) * 1_000_000_000'i64 + let badTs = Timestamp(futureNs.uint64) + + let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic) + let badHash = computeMessageHash(DefaultPubsubTopic, badMsg) + + server.messageIngress(goodHash, goodMsg) + server.messageIngress(badHash, badMsg) + + check remoteNeeds.len == 0 + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == goodHash + check neededHash != badHash + + asyncTest "duplicate ID is queued only once": + let ts = Timestamp(getNowInNanosecondTime()) + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let h = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(h, msg) + server.messageIngress(h, msg) + check remoteNeeds.len == 0 + + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == h + + asyncTest "sync terminates immediately when no diffs exist": + let ts = Timestamp(getNowInNanosecondTime()) + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + client.messageIngress(hash, msg) + + let idsQ = newAsyncQueue[SyncID]() + let wantsQ = newAsyncQueue[PeerId]() + let needsQ = newAsyncQueue[(PeerId, Fingerprint)]() + + server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ) + client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check needsQ.len == 0 suite "Waku Sync: transfer": var @@ -396,3 +732,40 @@ suite "Waku Sync: transfer": check: response.messages.len > 0 + + asyncTest "Check the exact missing messages are received": + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 3) + + var ts = timeSlice.a + + let msgA = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + ts += Timestamp(part) + let msgB = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + ts += Timestamp(part) + let msgC = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + + let hA = computeMessageHash(DefaultPubsubTopic, msgA) + let hB = computeMessageHash(DefaultPubsubTopic, msgB) + let hC = computeMessageHash(DefaultPubsubTopic, msgC) + + discard serverDriver.put(DefaultPubsubTopic, @[msgA, msgB, msgC]) + discard clientDriver.put(DefaultPubsubTopic, @[msgA]) + + await serverRemoteNeeds.put((clientPeerInfo.peerId, hB)) + await serverRemoteNeeds.put((clientPeerInfo.peerId, hC)) + await clientLocalWants.put(serverPeerInfo.peerId) + + await sleepAsync(1.seconds) + check serverRemoteNeeds.len == 0 + + let sid1 = await clientIds.get() + let sid2 = await clientIds.get() + + let received = [sid1.hash, sid2.hash].toHashSet() + let expected = [hB, hC].toHashSet + + check received == expected + + check clientIds.len == 0 diff --git a/tests/waku_store_sync/test_range_split.nim b/tests/waku_store_sync/test_range_split.nim new file mode 100644 index 000000000..50ebc39fd --- /dev/null +++ b/tests/waku_store_sync/test_range_split.nim @@ -0,0 +1,242 @@ +import unittest, nimcrypto, std/sequtils, results +import ../../waku/waku_store_sync/[reconciliation, common] +import ../../waku/waku_store_sync/storage/seq_storage +import ../../waku/waku_core/message/digest + +proc toDigest(s: string): WakuMessageHash = + let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, (s.len - 1))) + var res: WakuMessageHash + for i in 0 .. 31: + res[i] = d.data[i] + return res + +proc `..`(a, b: SyncID): Slice[SyncID] = + Slice[SyncID](a: a, b: b) + +suite "Waku Sync – reconciliation": + test "fan-out: eight fingerprint sub-ranges for large slice": + const N = 2_048 + const mismatchI = 70 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseHashMismatch: WakuMessageHash + var remoteHashMismatch: WakuMessageHash + + for i in 0 ..< N: + let ts = 1000 + i + let hashLocal = toDigest("msg" & $i) + local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + assert false, "failed to insert hash: " & $error + + var hashRemote = hashLocal + if i == mismatchI: + baseHashMismatch = hashLocal + remoteHashMismatch = toDigest("msg" & $i & "_x") + hashRemote = remoteHashMismatch + remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + assert false, "failed to insert hash: " & $error + + var z: WakuMessageHash + let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) + + check local.computeFingerprint(whole) != remote.computeFingerprint(whole) + + let remoteFp = remote.computeFingerprint(whole) + let payload = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(whole, RangeType.Fingerprint)], + fingerprints: @[remoteFp], + itemSets: @[], + ) + + var toSend, toRecv: seq[WakuMessageHash] + let reply = local.processPayload(payload, toSend, toRecv) + + check reply.ranges.len == 8 + check reply.ranges.allIt(it[1] == RangeType.Fingerprint) + check reply.itemSets.len == 0 + check reply.fingerprints.len == 8 + + let mismTime = 1000 + mismatchI + var covered = false + for (slc, _) in reply.ranges: + if mismTime >= slc.a.time and mismTime <= slc.b.time: + covered = true + break + check covered + + check toSend.len == 0 + check toRecv.len == 0 + + test "splits mismatched fingerprint into two sub-ranges then item-set": + const threshold = 4 + const partitions = 2 + + let local = SeqStorage.new(@[], threshold = threshold, partitions = partitions) + let remote = SeqStorage.new(@[], threshold = threshold, partitions = partitions) + + var mismatchHash: WakuMessageHash + for i in 0 ..< 8: + let t = 1000 + i + let baseHash = toDigest("msg" & $i) + + var localHash = baseHash + var remoteHash = baseHash + + if i == 3: + mismatchHash = toDigest("msg" & $i & "_x") + localHash = mismatchHash + + discard local.insert (SyncID(time: t, hash: localHash)) + discard remote.insert(SyncID(time: t, hash: remoteHash)) + + var zeroHash: WakuMessageHash + let wholeRange = + SyncID(time: 1000, hash: zeroHash) .. SyncID(time: 1007, hash: zeroHash) + + var toSend, toRecv: seq[WakuMessageHash] + + let payload = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(wholeRange, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(wholeRange)], + itemSets: @[], + ) + + let reply = local.processPayload(payload, toSend, toRecv) + + check reply.ranges.len == partitions + check reply.itemSets.len == partitions + + check reply.itemSets.anyIt( + it.elements.anyIt(it.hash == mismatchHash and it.time == 1003) + ) + + test "second round when N =2048 & local ": + const N = 2_048 + const mismatchI = 70 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseHashMismatch, remoteHashMismatch: WakuMessageHash + + for i in 0 ..< N: + let ts = 1000 + i + let hashLocal = toDigest("msg" & $i) + local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + assert false, "failed to insert hash: " & $error + + var hashRemote = hashLocal + if i == mismatchI: + baseHashMismatch = hashLocal + remoteHashMismatch = toDigest("msg" & $i & "_x") + hashRemote = remoteHashMismatch + remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + assert false, "failed to insert hash: " & $error + + var zero: WakuMessageHash + let sliceWhole = + SyncID(time: 1000, hash: zero) .. SyncID(time: 1000 + N - 1, hash: zero) + check local.computeFingerprint(sliceWhole) != remote.computeFingerprint(sliceWhole) + + let payload1 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sliceWhole, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(sliceWhole)], + itemSets: @[], + ) + + var toSend, toRecv: seq[WakuMessageHash] + let reply1 = local.processPayload(payload1, toSend, toRecv) + + check reply1.ranges.len == 8 + check reply1.ranges.allIt(it[1] == RangeType.Fingerprint) + + let mismTime = 1000 + mismatchI + var subSlice: Slice[SyncID] + for (sl, _) in reply1.ranges: + if mismTime >= sl.a.time and mismTime <= sl.b.time: + subSlice = sl + break + check subSlice.a.time != 0 + + let payload2 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(subSlice, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(subSlice)], + itemSets: @[], + ) + + var toSend2, toRecv2: seq[WakuMessageHash] + let reply2 = local.processPayload(payload2, toSend2, toRecv2) + + check reply2.ranges.len == 8 + check reply2.ranges.allIt(it[1] == RangeType.ItemSet) + check reply2.itemSets.len == 8 + + var matchCount = 0 + for iset in reply2.itemSets: + if iset.elements.anyIt(it.time == mismTime and it.hash == baseHashMismatch): + inc matchCount + check not iset.elements.anyIt(it.hash == remoteHashMismatch) + check matchCount == 1 + + check toSend2.len == 0 + check toRecv2.len == 0 + + test "second-round payload remote": + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseHash: WakuMessageHash + var alteredHash: WakuMessageHash + + for i in 0 ..< 8: + let ts = 1000 + i + let hashLocal = toDigest("msg" & $i) + local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + assert false, "failed to insert hash: " & $error + + var hashRemote = hashLocal + if i == 3: + baseHash = hashLocal + alteredHash = toDigest("msg" & $i & "_x") + hashRemote = alteredHash + + remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + assert false, "failed to insert hash: " & $error + + var zero: WakuMessageHash + let slice = SyncID(time: 1000, hash: zero) .. SyncID(time: 1007, hash: zero) + + check local.computeFingerprint(slice) != remote.computeFingerprint(slice) + + var toSend1, toRecv1: seq[WakuMessageHash] + let pay1 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(slice, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(slice)], + itemSets: @[], + ) + let rep1 = local.processPayload(pay1, toSend1, toRecv1) + + check rep1.ranges.len == 1 + check rep1.ranges[0][1] == RangeType.ItemSet + check toSend1.len == 0 + check toRecv1.len == 0 + + var toSend2, toRecv2: seq[WakuMessageHash] + discard remote.processPayload(rep1, toSend2, toRecv2) + + check toSend2.len == 1 + check toSend2[0] == alteredHash + check toRecv2.len == 1 + check toRecv2[0] == baseHash diff --git a/tests/waku_store_sync/test_state_transition.nim b/tests/waku_store_sync/test_state_transition.nim new file mode 100644 index 000000000..732a577a9 --- /dev/null +++ b/tests/waku_store_sync/test_state_transition.nim @@ -0,0 +1,244 @@ +import unittest, nimcrypto, std/sequtils +import ../../waku/waku_store_sync/[reconciliation, common] +import ../../waku/waku_store_sync/storage/seq_storage +import ../../waku/waku_core/message/digest + +proc toDigest*(s: string): WakuMessageHash = + let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high)) + for i in 0 .. 31: + result[i] = d.data[i] + +proc `..`(a, b: SyncID): Slice[SyncID] = + Slice[SyncID](a: a, b: b) + +suite "Waku Sync – reconciliation": + test "Fingerprint → ItemSet → zero (default thresholds)": + const N = 2_000 + const idx = 137 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr = h + if i == idx: + baseH = h + altH = toDigest("msg" & $i & "x") + hr = altH + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) + + var s1, r1: seq[WakuMessageHash] + let p1 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(whole, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(whole)], + itemSets: @[], + ) + let rep1 = local.processPayload(p1, s1, r1) + check rep1.ranges.len == 8 + check rep1.ranges.allIt(it[1] == RangeType.Fingerprint) + + let mismT = 1000 + idx + let sub = + rep1.ranges.filterIt(mismT >= it[0].a.time and mismT <= it[0].b.time)[0][0] + + var s2, r2: seq[WakuMessageHash] + let p2 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sub, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(sub)], + itemSets: @[], + ) + let rep2 = local.processPayload(p2, s2, r2) + check rep2.ranges.len == 8 + check rep2.ranges.allIt(it[1] == RangeType.ItemSet) + + var s3, r3: seq[WakuMessageHash] + discard remote.processPayload(rep2, s3, r3) + check s3.len == 1 and s3[0] == altH + check r3.len == 1 and r3[0] == baseH + + discard local.insert(SyncID(time: mismT, hash: altH)) + discard remote.insert(SyncID(time: mismT, hash: baseH)) + + var s4, r4: seq[WakuMessageHash] + let p3 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sub, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(sub)], + itemSets: @[], + ) + let rep3 = local.processPayload(p3, s4, r4) + check rep3.ranges.len == 0 + check s4.len == 0 and r4.len == 0 + + test "test 2 ranges includes 1 skip": + const N = 120 + const pivot = 60 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var diffHash: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr: WakuMessageHash + if i >= pivot: + diffHash = toDigest("msg" & $i & "_x") + hr = diffHash + else: + hr = h + + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z) + let sliceB = SyncID(time: 1060, hash: z) .. SyncID(time: 1119, hash: z) + + var s, r: seq[WakuMessageHash] + let payload = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)], + fingerprints: + @[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)], + itemSets: @[], + ) + let reply = local.processPayload(payload, s, r) + + check reply.ranges.len == 2 + check reply.ranges[0][1] == RangeType.Skip + check reply.ranges[1][1] == RangeType.ItemSet + check reply.itemSets.len == 1 + check not reply.itemSets[0].elements.anyIt(it.hash == diffHash) + + test "custom threshold (50) → eight ItemSets first round": + const N = 300 + const idx = 123 + + let local = SeqStorage.new(capacity = N, threshold = 50, partitions = 8) + let remote = SeqStorage.new(capacity = N, threshold = 50, partitions = 8) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr = h + if i == idx: + baseH = h + altH = toDigest("msg" & $i & "_x") + hr = altH + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) + + var toS, toR: seq[WakuMessageHash] + let p = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(slice, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(slice)], + itemSets: @[], + ) + let reply = local.processPayload(p, toS, toR) + + check reply.ranges.len == 8 + check reply.ranges.allIt(it[1] == RangeType.ItemSet) + check reply.itemSets.len == 8 + + let mismT = 1000 + idx + var hit = 0 + for ist in reply.itemSets: + if ist.elements.anyIt(it.time == mismT and it.hash == baseH): + inc hit + check hit == 1 + + test "test N=80K,3FP,2IS,SKIP": + const N = 80_000 + const bad = N - 10 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + + let hr = + if i == bad: + baseH = h + altH = toDigest("msg" & $i & "_x") + altH + else: + h + discard remote.insert(SyncID(time: ts, hash: hr)) + + var slice = + SyncID(time: 1000, hash: EmptyFingerprint) .. + SyncID(time: 1000 + N - 1, hash: FullFingerprint) + + proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData = + local.processPayload( + RangesData( + cluster: 0, + shards: @[0], + ranges: @[(s, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(s)], + itemSets: @[], + ), + sendQ, + recvQ, + ) + + var tmpS, tmpR: seq[WakuMessageHash] + + for r in 1 .. 3: + let rep = fpReply(slice, tmpS, tmpR) + check rep.ranges.len == 8 + check rep.ranges.allIt(it[1] == RangeType.Fingerprint) + for (sl, _) in rep.ranges: + if local.computeFingerprint(sl) != remote.computeFingerprint(sl): + slice = sl + break + + let rep4 = fpReply(slice, tmpS, tmpR) + check rep4.ranges.len == 8 + check rep4.ranges.allIt(it[1] == RangeType.ItemSet) + for (sl, _) in rep4.ranges: + if sl.a.time <= 1000 + bad and sl.b.time >= 1000 + bad: + slice = sl + break + + var send5, recv5: seq[WakuMessageHash] + let rep5 = fpReply(slice, send5, recv5) + check rep5.ranges.len == 1 + check rep5.ranges[0][1] == RangeType.ItemSet + + var qSend, qRecv: seq[WakuMessageHash] + discard remote.processPayload(rep5, qSend, qRecv) + check qSend.len == 1 and qSend[0] == altH + check qRecv.len == 1 and qRecv[0] == baseH + + discard local.insert(SyncID(time: slice.a.time, hash: altH)) + discard remote.insert(SyncID(time: slice.a.time, hash: baseH)) + + var send6, recv6: seq[WakuMessageHash] + let rep6 = fpReply(slice, send6, recv6) + check rep6.ranges.len == 0 + check send6.len == 0 and recv6.len == 0 diff --git a/vendor/waku-rlnv2-contract b/vendor/waku-rlnv2-contract index a576a8949..b7e9a9b1b 160000 --- a/vendor/waku-rlnv2-contract +++ b/vendor/waku-rlnv2-contract @@ -1 +1 @@ -Subproject commit a576a8949ca20e310f2fbb4ec0bd05a57ac3045f +Subproject commit b7e9a9b1bc69256a2a3076c1f099b50ce84e7eff diff --git a/waku.nimble b/waku.nimble index 5be212264..2f9a73595 100644 --- a/waku.nimble +++ b/waku.nimble @@ -1,3 +1,4 @@ +import os mode = ScriptMode.Verbose ### Package @@ -69,9 +70,15 @@ proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " & extra_params & " " & srcDir & name & ".nim" else: - exec "nim c" & " --out:build/" & name & - ".so --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " & - extra_params & " " & srcDir & name & ".nim" + var lib_name = toDll("libwaku") + when defined(windows): + exec "nim c" & " --out:build/" & lib_name & + " --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:off -d:discv5_protocol_id=d5waku " & + extra_params & " " & srcDir & name & ".nim" + else: + exec "nim c" & " --out:build/" & lib_name & + " --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " & + extra_params & " " & srcDir & name & ".nim" proc buildMobileAndroid(srcDir = ".", params = "") = let cpu = getEnv("CPU") @@ -160,33 +167,20 @@ task testone, "Test custom target": exec "build/" & filepath & ".bin" ### C Bindings +let chroniclesParams = + "-d:chronicles_line_numbers " & "-d:chronicles_runtime_filtering=on " & + """-d:chronicles_sinks="textlines,json" """ & + "-d:chronicles_default_output_device=Dynamic " & + """-d:chronicles_disabled_topics="eth,dnsdisc.client" """ & "--warning:Deprecated:off " & + "--warning:UnusedImport:on " & "-d:chronicles_log_level=TRACE" + task libwakuStatic, "Build the cbindings waku node library": let name = "libwaku" - buildLibrary name, - "library/", - """-d:chronicles_line_numbers \ - -d:chronicles_runtime_filtering=on \ - -d:chronicles_sinks="textlines,json" \ - -d:chronicles_default_output_device=Dynamic \ - -d:chronicles_disabled_topics="eth,dnsdisc.client" \ - --warning:Deprecated:off \ - --warning:UnusedImport:on \ - -d:chronicles_log_level=TRACE """, - "static" + buildLibrary name, "library/", chroniclesParams, "static" task libwakuDynamic, "Build the cbindings waku node library": let name = "libwaku" - buildLibrary name, - "library/", - """-d:chronicles_line_numbers \ - -d:chronicles_runtime_filtering=on \ - -d:chronicles_sinks="textlines,json" \ - -d:chronicles_default_output_device=Dynamic \ - -d:chronicles_disabled_topics="eth,dnsdisc.client" \ - --warning:Deprecated:off \ - --warning:UnusedImport:on \ - -d:chronicles_log_level=TRACE """, - "dynamic" + buildLibrary name, "library/", chroniclesParams, "dynamic" ### Mobile Android task libWakuAndroid, "Build the mobile bindings for Android": diff --git a/waku/factory/conf_builder/discv5_conf_builder.nim b/waku/factory/conf_builder/discv5_conf_builder.nim index 30755669b..e2729021e 100644 --- a/waku/factory/conf_builder/discv5_conf_builder.nim +++ b/waku/factory/conf_builder/discv5_conf_builder.nim @@ -38,6 +38,9 @@ proc withTableIpLimit*(b: var Discv5ConfBuilder, tableIpLimit: uint) = proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: Port) = b.udpPort = some(udpPort) +proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: uint) = + b.udpPort = some(Port(udpPort.uint16)) + proc withBootstrapNodes*(b: var Discv5ConfBuilder, bootstrapNodes: seq[string]) = # TODO: validate ENRs? b.bootstrapNodes = concat(b.bootstrapNodes, bootstrapNodes) diff --git a/waku/factory/conf_builder/dns_discovery_conf_builder.nim b/waku/factory/conf_builder/dns_discovery_conf_builder.nim index dbb2c5fd3..34337c9b1 100644 --- a/waku/factory/conf_builder/dns_discovery_conf_builder.nim +++ b/waku/factory/conf_builder/dns_discovery_conf_builder.nim @@ -21,6 +21,9 @@ proc withEnabled*(b: var DnsDiscoveryConfBuilder, enabled: bool) = proc withEnrTreeUrl*(b: var DnsDiscoveryConfBuilder, enrTreeUrl: string) = b.enrTreeUrl = some(enrTreeUrl) +proc withNameServers*(b: var DnsDiscoveryConfBuilder, nameServers: seq[IpAddress]) = + b.nameServers = nameServers + proc build*(b: DnsDiscoveryConfBuilder): Result[Option[DnsDiscoveryConf], string] = if not b.enabled.get(false): return ok(none(DnsDiscoveryConf)) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index a9e828893..190ce46e7 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -1005,6 +1005,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.dnsDiscoveryConf.withEnabled(n.dnsDiscovery) b.dnsDiscoveryConf.withEnrTreeUrl(n.dnsDiscoveryUrl) + b.dnsDiscoveryConf.withNameServers(n.dnsAddrsNameServers) if n.discv5Discovery.isSome(): b.discv5Conf.withEnabled(n.discv5Discovery.get()) diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index 619a1a7c5..9d1da0ace 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -30,12 +30,12 @@ type ClusterConf* = object # Cluster configuration corresponding to The Waku Network. Note that it # overrides existing cli configuration proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = - const RelayChainId = 11155111'u256 + const RelayChainId = 59141'u256 return ClusterConf( maxMessageSize: "150KiB", clusterId: 1, rlnRelay: true, - rlnRelayEthContractAddress: "0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8", + rlnRelayEthContractAddress: "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6", rlnRelayDynamic: true, rlnRelayChainId: RelayChainId, rlnEpochSizeSec: 600, diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 54290a77a..4f2fb5228 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -30,23 +30,29 @@ logScope: # using the when predicate does not work within the contract macro, hence need to dupe contract(WakuRlnContract): # this serves as an entrypoint into the rln membership set - proc register(idCommitment: UInt256, userMessageLimit: UInt32) + proc register( + idCommitment: UInt256, userMessageLimit: UInt32, idCommitmentsToErase: seq[UInt256] + ) + # Initializes the implementation contract (only used in unit tests) proc initialize(maxMessageLimit: UInt256) - # this event is raised when a new member is registered - proc MemberRegistered(rateCommitment: UInt256, index: UInt32) {.event.} + # this event is emitted when a new member is registered + proc MembershipRegistered( + idCommitment: UInt256, membershipRateLimit: UInt256, index: UInt32 + ) {.event.} + # this function denotes existence of a given user - proc memberExists(idCommitment: UInt256): UInt256 {.view.} + proc isInMembershipSet(idCommitment: Uint256): bool {.view.} # this constant describes the next index of a new member - proc commitmentIndex(): UInt256 {.view.} + proc nextFreeIndex(): UInt256 {.view.} # this constant describes the block number this contract was deployed on proc deployedBlockNumber(): UInt256 {.view.} # this constant describes max message limit of rln contract - proc MAX_MESSAGE_LIMIT(): UInt256 {.view.} - # this function returns the merkleProof for a given index - # proc merkleProofElements(index: UInt40): seq[byte] {.view.} - # this function returns the merkle root - proc root(): UInt256 {.view.} + proc maxMembershipRateLimit(): UInt256 {.view.} + # this function returns the merkleProof for a given index + # proc getMerkleProof(index: EthereumUInt40): seq[array[32, byte]] {.view.} + # this function returns the Merkle root + proc root(): Uint256 {.view.} type WakuRlnContractWithSender = Sender[WakuRlnContract] @@ -67,11 +73,7 @@ type proc setMetadata*( g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber) ): GroupManagerResult[void] = - let normalizedBlock = - if lastProcessedBlock.isSome(): - lastProcessedBlock.get() - else: - g.latestProcessedBlock + let normalizedBlock = lastProcessedBlock.get(g.latestProcessedBlock) try: let metadataSetRes = g.rlnInstance.setMetadata( RlnMetadata( @@ -87,14 +89,68 @@ proc setMetadata*( return err("failed to persist rln metadata: " & getCurrentExceptionMsg()) return ok() +proc sendEthCallWithChainId( + ethRpc: Web3, + functionSignature: string, + fromAddress: Address, + toAddress: Address, + chainId: UInt256, +): Future[Result[UInt256, string]] {.async.} = + ## Workaround for web3 chainId=null issue on some networks (e.g., linea-sepolia) + ## Makes contract calls with explicit chainId for view functions with no parameters + let functionHash = + keccak256.digest(functionSignature.toOpenArrayByte(0, functionSignature.len - 1)) + let functionSelector = functionHash.data[0 .. 3] + let dataSignature = "0x" & functionSelector.mapIt(it.toHex(2)).join("") + + var tx: TransactionArgs + tx.`from` = Opt.some(fromAddress) + tx.to = Opt.some(toAddress) + tx.value = Opt.some(0.u256) + tx.data = Opt.some(byteutils.hexToSeqByte(dataSignature)) + tx.chainId = Opt.some(chainId) + + let resultBytes = await ethRpc.provider.eth_call(tx, "latest") + if resultBytes.len == 0: + return err("No result returned for function call: " & functionSignature) + return ok(UInt256.fromBytesBE(resultBytes)) + +proc sendEthCallWithParams( + ethRpc: Web3, + functionSignature: string, + params: seq[byte], + fromAddress: Address, + toAddress: Address, + chainId: UInt256, +): Future[Result[seq[byte], string]] {.async.} = + ## Workaround for web3 chainId=null issue with parameterized contract calls + let functionHash = + keccak256.digest(functionSignature.toOpenArrayByte(0, functionSignature.len - 1)) + let functionSelector = functionHash.data[0 .. 3] + let callData = functionSelector & params + + var tx: TransactionArgs + tx.`from` = Opt.some(fromAddress) + tx.to = Opt.some(toAddress) + tx.value = Opt.some(0.u256) + tx.data = Opt.some(callData) + tx.chainId = Opt.some(chainId) + + let resultBytes = await ethRpc.provider.eth_call(tx, "latest") + return ok(resultBytes) + proc fetchMerkleProofElements*( g: OnchainGroupManager ): Future[Result[seq[byte], string]] {.async.} = try: + # let merkleRootInvocation = g.wakuRlnContract.get().root() + # let merkleRoot = await merkleRootInvocation.call() + # The above code is not working with the latest web3 version due to chainId being null (specifically on linea-sepolia) + # TODO: find better solution than this custom sendEthCallWithChainId call let membershipIndex = g.membershipIndex.get() let index40 = stuint(membershipIndex, 40) - let methodSig = "merkleProofElements(uint40)" + let methodSig = "getMerkleProof(uint40)" let methodIdDigest = keccak.keccak256.digest(methodSig) let methodId = methodIdDigest.data[0 .. 3] @@ -111,6 +167,7 @@ proc fetchMerkleProofElements*( var tx: TransactionArgs tx.to = Opt.some(fromHex(Address, g.ethContractAddress)) tx.data = Opt.some(callData) + tx.chainId = Opt.some(g.chainId) # Explicitly set the chain ID let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest") @@ -123,8 +180,17 @@ proc fetchMerkleRoot*( g: OnchainGroupManager ): Future[Result[UInt256, string]] {.async.} = try: - let merkleRootInvocation = g.wakuRlnContract.get().root() - let merkleRoot = await merkleRootInvocation.call() + let merkleRoot = ( + await sendEthCallWithChainId( + ethRpc = g.ethRpc.get(), + functionSignature = "root()", + fromAddress = g.ethRpc.get().defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) + ).valueOr: + error "Failed to fetch Merkle root", error = $error + return err("Failed to fetch merkle root: " & $error) return ok(merkleRoot) except CatchableError: error "Failed to fetch Merkle root", error = getCurrentExceptionMsg() @@ -151,6 +217,7 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = return false let merkleRoot = UInt256ToField(rootRes.get()) + if g.validRoots.len == 0: g.validRoots.addLast(merkleRoot) return true @@ -183,8 +250,26 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError error "Failed to fetch Merkle proof", error = proofResult.error g.merkleProofCache = proofResult.get() - # also need update registerd membership - let memberCount = cast[int64](await wakuRlnContract.commitmentIndex().call()) + # also need to update registered membership + # g.rlnRelayMaxMessageLimit = + # cast[uint64](await wakuRlnContract.nextFreeIndex().call()) + # The above code is not working with the latest web3 version due to chainId being null (specifically on linea-sepolia) + # TODO: find better solution than this custom sendEthCallWithChainId call + let nextFreeIndex = await sendEthCallWithChainId( + ethRpc = ethRpc, + functionSignature = "nextFreeIndex()", + fromAddress = ethRpc.defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) + + if nextFreeIndex.isErr(): + error "Failed to fetch next free index", error = nextFreeIndex.error + raise newException( + CatchableError, "Failed to fetch next free index: " & nextFreeIndex.error + ) + + let memberCount = cast[int64](nextFreeIndex.get()) waku_rln_number_registered_memberships.set(float64(memberCount)) await sleepAsync(rpcDelay) @@ -219,15 +304,19 @@ method register*( var gasPrice: int g.retryWrapper(gasPrice, "Failed to get gas price"): int(await ethRpc.provider.eth_gasPrice()) * 2 + let idCommitmentHex = identityCredential.idCommitment.inHex() + debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex let idCommitment = identityCredential.idCommitment.toUInt256() - + let idCommitmentsToErase: seq[UInt256] = @[] debug "registering the member", - idCommitment = idCommitment, userMessageLimit = userMessageLimit + idCommitment = idCommitment, + userMessageLimit = userMessageLimit, + idCommitmentsToErase = idCommitmentsToErase var txHash: TxHash g.retryWrapper(txHash, "Failed to register the member"): - await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send( - gasPrice = gasPrice - ) + await wakuRlnContract + .register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase) + .send(gasPrice = gasPrice) # wait for the transaction to be mined var tsReceipt: ReceiptObject @@ -240,27 +329,29 @@ method register*( debug "ts receipt", receipt = tsReceipt[] if tsReceipt.status.isNone(): - raise newException(ValueError, "register: transaction failed status is None") + raise newException(ValueError, "Transaction failed: status is None") if tsReceipt.status.get() != 1.Quantity: raise newException( - ValueError, "register: transaction failed status is: " & $tsReceipt.status.get() + ValueError, "Transaction failed with status: " & $tsReceipt.status.get() ) - let firstTopic = tsReceipt.logs[0].topics[0] - # the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value - if firstTopic != - cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data): + ## Extract MembershipRegistered event from transaction logs (third event) + let thirdTopic = tsReceipt.logs[2].topics[0] + debug "third topic", thirdTopic = thirdTopic + if thirdTopic != + cast[FixedBytes[32]](keccak.keccak256.digest( + "MembershipRegistered(uint256,uint256,uint32)" + ).data): raise newException(ValueError, "register: unexpected event signature") - # the arguments of the raised event i.e., MemberRegistered are encoded inside the data field - # data = rateCommitment encoded as 256 bits || index encoded as 32 bits - let arguments = tsReceipt.logs[0].data + ## Parse MembershipRegistered event data: rateCommitment(256) || membershipRateLimit(256) || index(32) + let arguments = tsReceipt.logs[2].data debug "tx log data", arguments = arguments let - # In TX log data, uints are encoded in big endian - membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1]) + ## Extract membership index from transaction log data (big endian) + membershipIndex = UInt256.fromBytesBE(arguments[64 .. 95]) - debug "parsed membershipIndex", membershipIndex + trace "parsed membershipIndex", membershipIndex g.userMessageLimit = some(userMessageLimit) g.membershipIndex = some(membershipIndex.toMembershipIndex()) g.idCredentials = some(identityCredential) @@ -376,7 +467,7 @@ method generateProof*( var proofValue = cast[ptr array[320, byte]](output_witness_buffer.`ptr`) let proofBytes: array[320, byte] = proofValue[] - ## parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ] + ## Parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ] let proofOffset = 128 rootOffset = proofOffset + 32 @@ -418,9 +509,7 @@ method generateProof*( return ok(output) method verifyProof*( - g: OnchainGroupManager, # verifier context - input: seq[byte], # raw message data (signal) - proof: RateLimitProof, # proof received from the peer + g: OnchainGroupManager, input: seq[byte], proof: RateLimitProof ): GroupManagerResult[bool] {.gcsafe, raises: [].} = ## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots -- @@ -543,11 +632,31 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} g.membershipIndex = some(keystoreCred.treeIndex) g.userMessageLimit = some(keystoreCred.userMessageLimit) # now we check on the contract if the commitment actually has a membership + let idCommitmentBytes = keystoreCred.identityCredential.idCommitment + let idCommitmentUInt256 = keystoreCred.identityCredential.idCommitment.toUInt256() + let idCommitmentHex = idCommitmentBytes.inHex() + debug "Keystore idCommitment in bytes", idCommitmentBytes = idCommitmentBytes + debug "Keystore idCommitment in UInt256 ", idCommitmentUInt256 = idCommitmentUInt256 + debug "Keystore idCommitment in hex ", idCommitmentHex = idCommitmentHex + let idCommitment = idCommitmentUInt256 try: - let membershipExists = await wakuRlnContract - .memberExists(keystoreCred.identityCredential.idCommitment.toUInt256()) - .call() - if membershipExists == 0: + let commitmentBytes = keystoreCred.identityCredential.idCommitment + let params = commitmentBytes.reversed() + let resultBytes = await sendEthCallWithParams( + ethRpc = g.ethRpc.get(), + functionSignature = "isInMembershipSet(uint256)", + params = params, + fromAddress = ethRpc.defaultAccount, + toAddress = contractAddress, + chainId = g.chainId, + ) + if resultBytes.isErr(): + return err("Failed to check membership: " & resultBytes.error) + let responseBytes = resultBytes.get() + let membershipExists = responseBytes.len == 32 and responseBytes[^1] == 1'u8 + + debug "membershipExists", membershipExists = membershipExists + if membershipExists == false: return err("the commitment does not have a membership") except CatchableError: return err("failed to check if the commitment has a membership") @@ -564,8 +673,18 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} if metadata.contractAddress != g.ethContractAddress.toLower(): return err("persisted data: contract address mismatch") - g.rlnRelayMaxMessageLimit = - cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call()) + let maxMembershipRateLimit = ( + await sendEthCallWithChainId( + ethRpc = ethRpc, + functionSignature = "maxMembershipRateLimit()", + fromAddress = ethRpc.defaultAccount, + toAddress = contractAddress, + chainId = g.chainId, + ) + ).valueOr: + return err("Failed to fetch max membership rate limit: " & $error) + + g.rlnRelayMaxMessageLimit = cast[uint64](maxMembershipRateLimit) proc onDisconnect() {.async.} = error "Ethereum client disconnected" diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 01000935b..0601d2c23 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -122,7 +122,7 @@ proc processRequest( roundTrips.inc() - debug "sync payload received", + trace "sync payload received", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = recvPayload @@ -141,7 +141,7 @@ proc processRequest( recvPayload.shards.toPackedSet() == self.shards: sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) - debug "sync payload processed", + trace "sync payload processed", hash_to_send = hashToSend, hash_to_recv = hashToRecv sendPayload.cluster = self.cluster @@ -166,7 +166,7 @@ proc processRequest( return err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) - debug "sync payload sent", + trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = sendPayload @@ -217,7 +217,7 @@ proc initiate( "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg ) - debug "sync payload sent", + trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, payload = initPayload