Merge branch 'master' into add_shard_metrics

This commit is contained in:
Darshan K 2025-06-26 11:41:55 +05:30 committed by GitHub
commit f6c571f093
30 changed files with 1824 additions and 427 deletions

View File

@ -81,9 +81,13 @@ jobs:
make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1
cd ../../../../
- name: Building wakunode2
- name: Building wakunode2.exe
run: |
make wakunode2 LOG_LEVEL=DEBUG V=3 -j8
- name: Building libwaku.dll
run: |
make libwaku STATIC=0 LOG_LEVEL=DEBUG V=1 -j
- name: Check Executable
run: |
@ -93,3 +97,9 @@ jobs:
echo "Build failed: wakunode2.exe not found"
exit 1
fi
if [ -f "./build/libwaku.dll" ]; then
echo "libwaku.dll build successful"
else
echo "Build failed: libwaku.dll not found"
exit 1
fi

View File

@ -112,11 +112,8 @@ ifeq (, $(shell which cargo))
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable
endif
anvil: rustup
ifeq (, $(shell which anvil 2> /dev/null))
# Install Anvil if it's not installed
./scripts/install_anvil.sh
endif
rln-deps: rustup
./scripts/install_rln_tests_dependencies.sh
deps: | deps-common nat-libs waku.nims
@ -205,8 +202,8 @@ testcommon: | build deps
##########
.PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester
# install anvil only for the testwaku target
testwaku: | build deps anvil librln
# install rln-deps only for the testwaku target
testwaku: | build deps rln-deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims
@ -401,14 +398,16 @@ docker-liteprotocoltester-push:
STATIC ?= 0
libwaku: | build deps librln
rm -f build/libwaku*
rm -f build/libwaku*
ifeq ($(STATIC), 1)
echo -e $(BUILD_MSG) "build/$@.a" && \
$(ENV_SCRIPT) nim libwakuStatic $(NIM_PARAMS) waku.nims
echo -e $(BUILD_MSG) "build/$@.a" && $(ENV_SCRIPT) nim libwakuStatic $(NIM_PARAMS) waku.nims
else ifeq ($(detected_OS),Windows)
echo -e $(BUILD_MSG) "build/$@.dll" && $(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims
else
echo -e $(BUILD_MSG) "build/$@.so" && \
$(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims
echo -e $(BUILD_MSG) "build/$@.so" && $(ENV_SCRIPT) nim libwakuDynamic $(NIM_PARAMS) waku.nims
endif
#####################

View File

@ -87,7 +87,7 @@ pacman -S --noconfirm --needed mingw-w64-x86_64-python
#### 3. Build Wakunode
- Open Git Bash as administrator
- clone nwaku and cd nwaku
- Execute: `./scripts/build_wakunode_windows.sh`
- Execute: `./scripts/build_windows.sh`
#### 4. Troubleshooting
If `wakunode2.exe` isn't generated:

View File

@ -9,7 +9,7 @@ x-logging: &logging
x-rln-relay-eth-client-address: &rln_relay_eth_client_address ${RLN_RELAY_ETH_CLIENT_ADDRESS:-} # Add your RLN_RELAY_ETH_CLIENT_ADDRESS after the "-"
x-rln-environment: &rln_env
RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8}
RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xB9cd878C90E49F797B4431fBF4fb333108CB90e6}
RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-"
RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-"

View File

@ -24,7 +24,7 @@ fi
docker run -v $(pwd)/keystore:/keystore/:Z harbor.status.im/wakuorg/nwaku:v0.30.1 generateRlnKeystore \
--rln-relay-eth-client-address=${RLN_RELAY_ETH_CLIENT_ADDRESS} \
--rln-relay-eth-private-key=${ETH_TESTNET_KEY} \
--rln-relay-eth-contract-address=0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8 \
--rln-relay-eth-contract-address=0xB9cd878C90E49F797B4431fBF4fb333108CB90e6 \
--rln-relay-cred-path=/keystore/keystore.json \
--rln-relay-cred-password="${RLN_RELAY_CRED_PASSWORD}" \
--rln-relay-user-message-limit=20 \

View File

@ -0,0 +1,9 @@
import system, std/json, ./json_base_event
type JsonWakuNotRespondingEvent* = ref object of JsonEvent
proc new*(T: type JsonWakuNotRespondingEvent): T =
return JsonWakuNotRespondingEvent(eventType: "waku_not_responding")
method `$`*(event: JsonWakuNotRespondingEvent): string =
$(%*event)

View File

@ -45,6 +45,8 @@ int waku_version(void* ctx,
WakuCallBack callback,
void* userData);
// Sets a callback that will be invoked whenever an event occurs.
// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe.
void waku_set_event_callback(void* ctx,
WakuCallBack callback,
void* userData);

View File

@ -15,8 +15,7 @@ import
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay,
./events/
[json_message_event, json_topic_health_change_event, json_connection_change_event],
./events/json_message_event,
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
@ -48,25 +47,6 @@ template checkLibwakuParams*(
if isNil(callback):
return RET_MISSING_CALLBACK
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
foreignThreadGc:
try:
let event = body
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
proc handleRequest(
ctx: ptr WakuContext,
requestType: RequestType,
@ -81,21 +61,6 @@ proc handleRequest(
return RET_OK
proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
### End of not-exported components
################################################################################
@ -146,8 +111,8 @@ proc waku_new(
return nil
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_thread.createWakuThread().valueOr:
let msg = "Error in createWakuThread: " & $error
var ctx = waku_thread.createWakuContext().valueOr:
let msg = "Error in createWakuContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
@ -180,7 +145,7 @@ proc waku_destroy(
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread.destroyWakuThread(ctx).isOkOr:
waku_thread.destroyWakuContext(ctx).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

View File

@ -18,6 +18,7 @@ type DebugNodeMsgType* = enum
RETRIEVE_MY_PEER_ID
RETRIEVE_METRICS
RETRIEVE_ONLINE_STATE
CHECK_WAKU_NOT_BLOCKED
type DebugNodeRequest* = object
operation: DebugNodeMsgType
@ -55,6 +56,8 @@ proc process*(
return ok(getMetrics())
of RETRIEVE_ONLINE_STATE:
return ok($waku.healthMonitor.onlineMonitor.amIOnline())
of CHECK_WAKU_NOT_BLOCKED:
return ok("waku thread is not blocked")
error "unsupported operation in DebugNodeRequest"
return err("unsupported operation in DebugNodeRequest")

View File

@ -4,10 +4,22 @@
import std/[options, atomics, os, net, locks]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
import
waku/factory/waku,
waku/node/peer_manager,
waku/waku_relay/[protocol, topic_health],
waku/waku_core/[topics/pubsub_topic, message],
./inter_thread_communication/[waku_thread_request, requests/debug_node_request],
../ffi_types,
../events/[
json_message_event, json_topic_health_change_event, json_connection_change_event,
json_waku_not_responding_event,
]
type WakuContext* = object
thread: Thread[(ptr WakuContext)]
wakuThread: Thread[(ptr WakuContext)]
watchdogThread: Thread[(ptr WakuContext)]
# monitors the Waku thread and notifies the Waku SDK consumer if it hangs
lock: Lock
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
reqSignal: ThreadSignalPtr
@ -17,78 +29,48 @@ type WakuContext* = object
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the thread is running
running: Atomic[bool] # To control when the threads are running
const git_version* {.strdefine.} = "n/a"
const versionString = "version / git commit hash: " & waku.git_version
proc runWaku(ctx: ptr WakuContext) {.async.} =
## This is the worker body. This runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
var waku: Waku
foreignThreadGc:
try:
let event = body
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
while true:
await ctx.reqSignal.wait()
proc onConnectionChange*(ctx: ptr WakuContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
if ctx.running.load == false:
break
proc onReceivedMessage*(ctx: ptr WakuContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
## Trying to get a request from the libwaku requestor thread
var request: ptr WakuThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "waku thread could not receive a request"
continue
proc onTopicHealthChange*(ctx: ptr WakuContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Handle the request
asyncSpawn WakuThreadRequest.process(request, addr waku)
proc run(ctx: ptr WakuContext) {.thread.} =
## Launch waku worker
waitFor runWaku(ctx)
proc createWakuThread*(): Result[ptr WakuContext, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
var ctx = createShared(WakuContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ValueError, ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroyWakuThread: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroyWakuThread")
joinThread(ctx.thread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()
proc onWakuNotResponding*(ctx: ptr WakuContext) =
callEventCallback(ctx, "onWakuNotResponsive"):
$JsonWakuNotRespondingEvent.new()
proc sendRequestToWakuThread*(
ctx: ptr WakuContext,
@ -96,16 +78,17 @@ proc sendRequestToWakuThread*(
reqContent: pointer,
callback: WakuCallBack,
userData: pointer,
timeout = InfiniteDuration,
): Result[void, string] =
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
ctx.lock.acquire()
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
ctx.lock.acquire()
defer:
ctx.lock.release()
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
@ -122,11 +105,115 @@ proc sendRequestToWakuThread*(
return err("Couldn't fireSync in time")
## wait until the Waku Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync()
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
## process proc.
## process proc. See the 'waku_thread_request.nim' module for more details.
ok()
proc watchdogThreadBody(ctx: ptr WakuContext) {.thread.} =
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
let watchdogRun = proc(ctx: ptr WakuContext) {.async.} =
const WatchdogTimeinterval = 1.seconds
const WakuNotRespondingTimeout = 3.seconds
while true:
await sleepAsync(WatchdogTimeinterval)
if ctx.running.load == false:
debug "Watchdog thread exiting because WakuContext is not running"
break
let wakuCallback = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard ## Don't do anything. Just respecting the callback signature.
const nilUserData = nil
trace "Sending watchdog request to Waku thread"
sendRequestToWakuThread(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
wakuCallback,
nilUserData,
WakuNotRespondingTimeout,
).isOkOr:
error "Failed to send watchdog request to Waku thread", error = $error
onWakuNotResponding(ctx)
waitFor watchdogRun(ctx)
proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} =
## Waku thread that attends library user requests (stop, connect_to, etc.)
let wakuRun = proc(ctx: ptr WakuContext) {.async.} =
var waku: Waku
while true:
await ctx.reqSignal.wait()
if ctx.running.load == false:
break
## Trying to get a request from the libwaku requestor thread
var request: ptr WakuThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "waku thread could not receive a request"
continue
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Handle the request
asyncSpawn WakuThreadRequest.process(request, addr waku)
waitFor wakuRun(ctx)
proc createWakuContext*(): Result[ptr WakuContext, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
var ctx = createShared(WakuContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.running.store(true)
try:
createThread(ctx.wakuThread, wakuThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
try:
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc destroyWakuContext*(ctx: ptr WakuContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroyWakuContext: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroyWakuContext")
joinThread(ctx.wakuThread)
joinThread(ctx.watchdogThread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()

View File

@ -36,25 +36,28 @@ cd ../../../..
echo "6. -.-.-.- Building libunwind -.-.-.-"
cd vendor/nim-libbacktrace
execute_command "make all V=1"
execute_command "make install/usr/lib/libunwind.a V=1"
execute_command "make all V=1 -j8"
execute_command "make install/usr/lib/libunwind.a V=1 -j8"
cp ./vendor/libunwind/build/lib/libunwind.a install/usr/lib
cd ../../
echo "7. -.-.-.- Building miniupnpc -.-.-.- "
cd vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc
execute_command "git checkout little_chore_windows_support"
execute_command "make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1"
execute_command "make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1 -j8"
cd ../../../../..
echo "8. -.-.-.- Building libnatpmp -.-.-.- "
cd ./vendor/nim-nat-traversal/vendor/libnatpmp-upstream
make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1
make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1 -j8
cd ../../../../
echo "9. -.-.-.- Building wakunode2 -.-.-.- "
execute_command "make wakunode2 LOG_LEVEL=DEBUG V=1 -j8"
echo "10. -.-.-.- Building libwaku -.-.-.- "
execute_command "make libwaku STATIC=0 LOG_LEVEL=DEBUG V=1 -j8"
echo "Windows setup completed successfully!"
echo "✓ Successful commands: $success_count"
echo "✗ Failed commands: $failure_count"

View File

@ -2,13 +2,14 @@
# Install Anvil
if ! command -v anvil &> /dev/null; then
BASE_DIR="${XDG_CONFIG_HOME:-$HOME}"
FOUNDRY_DIR="${FOUNDRY_DIR:-"$BASE_DIR/.foundry"}"
FOUNDRY_BIN_DIR="$FOUNDRY_DIR/bin"
BASE_DIR="${XDG_CONFIG_HOME:-$HOME}"
FOUNDRY_DIR="${FOUNDRY_DIR-"$BASE_DIR/.foundry"}"
FOUNDRY_BIN_DIR="$FOUNDRY_DIR/bin"
curl -L https://foundry.paradigm.xyz | bash
# Extract the source path from the download result
echo "foundryup_path: $FOUNDRY_BIN_DIR"
# run foundryup
$FOUNDRY_BIN_DIR/foundryup
curl -L https://foundry.paradigm.xyz | bash
# Extract the source path from the download result
echo "foundryup_path: $FOUNDRY_BIN_DIR"
# run foundryup
$FOUNDRY_BIN_DIR/foundryup
fi

8
scripts/install_pnpm.sh Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
# Install pnpm
if ! command -v pnpm &> /dev/null; then
echo "pnpm is not installed, installing it now..."
npm i pnpm --global
fi

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# Install Anvil
./scripts/install_anvil.sh
#Install pnpm
./scripts/install_pnpm.sh

View File

@ -108,4 +108,4 @@ import
import ./waku_rln_relay/test_all
# Node Factory
import ./factory/test_external_config
import ./factory/[test_external_config, test_node_factory, test_waku_conf]

View File

@ -9,7 +9,7 @@ import
testutils/unittests
import
waku/factory/waku_conf,
waku/factory/waku_conf_builder,
waku/factory/conf_builder/conf_builder,
waku/factory/networks_config,
waku/common/utils/parse_size_units
@ -24,7 +24,7 @@ suite "Waku Conf - build with cluster conf":
let expectedShards = toSeq[0.uint16 .. 7.uint16]
## Given
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withRelay(true)
builder.rlnRelayConf.withTreePath("/tmp/test-tree-path")
@ -65,7 +65,7 @@ suite "Waku Conf - build with cluster conf":
let expectedShards = toSeq[0.uint16 .. 7.uint16]
## Given
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withRelay(false)
@ -95,7 +95,7 @@ suite "Waku Conf - build with cluster conf":
expectedShards = toSeq[0.uint16 .. 7.uint16]
## Given
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.rlnRelayConf.withEnabled(false)
@ -122,7 +122,7 @@ suite "Waku Conf - build with cluster conf":
let shards = @[2.uint16, 3.uint16]
## Given
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withShards(shards)
@ -148,7 +148,7 @@ suite "Waku Conf - build with cluster conf":
let shards = @[2.uint16, 10.uint16]
## Given
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withShards(shards)
@ -158,11 +158,11 @@ suite "Waku Conf - build with cluster conf":
## Then
assert resConf.isErr(), "Invalid shard was accepted"
test "Cluster Conf is passed and RLN contract is overridden":
test "Cluster Conf is passed and RLN contract is **not** overridden":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
builder.rlnRelayConf.withEthClientAddress("https://my_eth_rpc_url/")
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
# Mount all shards in network
let expectedShards = toSeq[0.uint16 .. 7.uint16]
@ -194,7 +194,8 @@ suite "Waku Conf - build with cluster conf":
assert conf.rlnRelayConf.isSome
let rlnRelayConf = conf.rlnRelayConf.get()
check rlnRelayConf.ethContractAddress.string == contractAddress
check rlnRelayConf.ethContractAddress.string ==
clusterConf.rlnRelayEthContractAddress
check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic
check rlnRelayConf.chainId == clusterConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec

View File

@ -3,7 +3,7 @@
{.push raises: [].}
import
std/[options, sequtils, deques, random],
std/[options, sequtils, deques, random, locks],
results,
stew/byteutils,
testutils/unittests,
@ -28,58 +28,71 @@ import
../testlib/wakucore,
./utils_onchain
var testLock: Lock
initLock(testLock)
suite "Onchain group manager":
# We run Anvil
let runAnvil {.used.} = runAnvil()
setup:
# Acquire lock to ensure tests run sequentially
acquire(testLock)
var manager {.threadvar.}: OnchainGroupManager
let runAnvil {.used.} = runAnvil()
asyncSetup:
manager = await setupOnchainGroupManager()
var manager {.threadvar.}: OnchainGroupManager
manager = waitFor setupOnchainGroupManager()
asyncTeardown:
await manager.stop()
teardown:
waitFor manager.stop()
stopAnvil(runAnvil)
# Release lock after test completes
release(testLock)
asyncTest "should initialize successfully":
(await manager.init()).isOkOr:
test "should initialize successfully":
(waitFor manager.init()).isOkOr:
raiseAssert $error
check:
manager.ethRpc.isSome()
manager.wakuRlnContract.isSome()
manager.initialized
manager.rlnRelayMaxMessageLimit == 100
manager.rlnRelayMaxMessageLimit == 600
asyncTest "should error on initialization when chainId does not match":
test "should error on initialization when chainId does not match":
manager.chainId = utils_onchain.CHAIN_ID + 1
(await manager.init()).isErrOr:
(waitFor manager.init()).isErrOr:
raiseAssert "Expected error when chainId does not match"
asyncTest "should initialize when chainId is set to 0":
test "should initialize when chainId is set to 0":
manager.chainId = 0x0'u256
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
asyncTest "should error on initialization when loaded metadata does not match":
(await manager.init()).isOkOr:
test "should error on initialization when loaded metadata does not match":
(waitFor manager.init()).isOkOr:
assert false, $error
let metadataSetRes = manager.setMetadata()
assert metadataSetRes.isOk(), metadataSetRes.error
let metadataOpt = manager.rlnInstance.getMetadata().valueOr:
assert false, $error
return
assert metadataOpt.isSome(), "metadata is not set"
let metadata = metadataOpt.get()
assert metadata.chainId == 1337, "chainId is not equal to 1337"
assert metadata.chainId == 1234, "chainId is not equal to 1234"
assert metadata.contractAddress == manager.ethContractAddress,
"contractAddress is not equal to " & manager.ethContractAddress
let differentContractAddress = await uploadRLNContract(manager.ethClientUrls[0])
let web3 = manager.ethRpc.get()
let accounts = waitFor web3.provider.eth_accounts()
web3.defaultAccount = accounts[2]
let (privateKey, acc) = createEthAccount(web3)
let tokenAddress = (waitFor deployTestToken(privateKey, acc, web3)).valueOr:
assert false, "Failed to deploy test token contract: " & $error
return
let differentContractAddress = (
waitFor executeForgeContractDeployScripts(privateKey, acc, web3)
).valueOr:
assert false, "Failed to deploy RLN contract: " & $error
return
# simulating a change in the contractAddress
let manager2 = OnchainGroupManager(
ethClientUrls: @[EthClient],
@ -89,52 +102,47 @@ suite "Onchain group manager":
assert false, errStr
,
)
let e = await manager2.init()
let e = waitFor manager2.init()
(e).isErrOr:
assert false, "Expected error when contract address doesn't match"
asyncTest "should error if contract does not exist":
test "should error if contract does not exist":
manager.ethContractAddress = "0x0000000000000000000000000000000000000000"
var triggeredError = false
try:
discard await manager.init()
except CatchableError:
triggeredError = true
(waitFor manager.init()).isErrOr:
raiseAssert "Expected error when contract address doesn't exist"
check triggeredError
asyncTest "should error when keystore path and password are provided but file doesn't exist":
test "should error when keystore path and password are provided but file doesn't exist":
manager.keystorePath = some("/inexistent/file")
manager.keystorePassword = some("password")
(await manager.init()).isErrOr:
(waitFor manager.init()).isErrOr:
raiseAssert "Expected error when keystore file doesn't exist"
asyncTest "trackRootChanges: start tracking roots":
(await manager.init()).isOkOr:
test "trackRootChanges: start tracking roots":
(waitFor manager.init()).isOkOr:
raiseAssert $error
discard manager.trackRootChanges()
asyncTest "trackRootChanges: should guard against uninitialized state":
test "trackRootChanges: should guard against uninitialized state":
try:
discard manager.trackRootChanges()
except CatchableError:
check getCurrentExceptionMsg().len == 38
asyncTest "trackRootChanges: should sync to the state of the group":
test "trackRootChanges: should sync to the state of the group":
let credentials = generateCredentials(manager.rlnInstance)
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
let merkleRootBefore = manager.fetchMerkleRoot()
try:
await manager.register(credentials, UserMessageLimit(1))
waitFor manager.register(credentials, UserMessageLimit(20))
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
discard await withTimeout(trackRootChanges(manager), 15.seconds)
discard waitFor withTimeout(trackRootChanges(manager), 15.seconds)
let merkleRootAfter = manager.fetchMerkleRoot()
@ -151,7 +159,7 @@ suite "Onchain group manager":
metadata.validRoots == manager.validRoots.toSeq()
merkleRootBefore != merkleRootAfter
asyncTest "trackRootChanges: should fetch history correctly":
test "trackRootChanges: should fetch history correctly":
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
# relies on a busy loop rather than event-based monitoring. As a result, some root changes
# may be missed, leading to inconsistent test results (i.e., it may randomly return true or false).
@ -159,15 +167,16 @@ suite "Onchain group manager":
# after each registration.
const credentialCount = 6
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
let merkleRootBefore = manager.fetchMerkleRoot()
try:
for i in 0 ..< credentials.len():
await manager.register(credentials[i], UserMessageLimit(1))
discard await manager.updateRoots()
debug "Registering credential", index = i, credential = credentials[i]
waitFor manager.register(credentials[i], UserMessageLimit(20))
discard waitFor manager.updateRoots()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
@ -177,13 +186,13 @@ suite "Onchain group manager":
merkleRootBefore != merkleRootAfter
manager.validRoots.len() == credentialCount
asyncTest "register: should guard against uninitialized state":
test "register: should guard against uninitialized state":
let dummyCommitment = default(IDCommitment)
try:
await manager.register(
waitFor manager.register(
RateCommitment(
idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(1)
idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20)
)
)
except CatchableError:
@ -191,18 +200,18 @@ suite "Onchain group manager":
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()
asyncTest "register: should register successfully":
test "register: should register successfully":
# TODO :- similar to ```trackRootChanges: should fetch history correctly```
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
let idCommitment = generateCredentials(manager.rlnInstance).idCommitment
let merkleRootBefore = manager.fetchMerkleRoot()
try:
await manager.register(
waitFor manager.register(
RateCommitment(
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1)
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20)
)
)
except Exception, CatchableError:
@ -215,47 +224,47 @@ suite "Onchain group manager":
merkleRootAfter != merkleRootBefore
manager.latestIndex == 1
asyncTest "register: callback is called":
test "register: callback is called":
let idCredentials = generateCredentials(manager.rlnInstance)
let idCommitment = idCredentials.idCommitment
let fut = newFuture[void]()
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(1)).get()
let rateCommitment = getRateCommitment(idCredentials, UserMessageLimit(20)).get()
check:
registrations.len == 1
registrations[0].rateCommitment == rateCommitment
registrations[0].index == 0
fut.complete()
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
manager.onRegister(callback)
try:
await manager.register(
waitFor manager.register(
RateCommitment(
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(1)
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20)
)
)
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
waitFor fut
asyncTest "withdraw: should guard against uninitialized state":
test "withdraw: should guard against uninitialized state":
let idSecretHash = generateCredentials(manager.rlnInstance).idSecretHash
try:
await manager.withdraw(idSecretHash)
waitFor manager.withdraw(idSecretHash)
except CatchableError:
assert true
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()
asyncTest "validateRoot: should validate good root":
test "validateRoot: should validate good root":
let idCredentials = generateCredentials(manager.rlnInstance)
let idCommitment = idCredentials.idCommitment
@ -264,27 +273,27 @@ suite "Onchain group manager":
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].rateCommitment ==
getRateCommitment(idCredentials, UserMessageLimit(1)).get() and
getRateCommitment(idCredentials, UserMessageLimit(20)).get() and
registrations[0].index == 0:
manager.idCredentials = some(idCredentials)
fut.complete()
manager.onRegister(callback)
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
try:
await manager.register(idCredentials, UserMessageLimit(1))
waitFor manager.register(idCredentials, UserMessageLimit(20))
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
waitFor fut
let rootUpdated = await manager.updateRoots()
let rootUpdated = waitFor manager.updateRoots()
if rootUpdated:
let proofResult = await manager.fetchMerkleProofElements()
let proofResult = waitFor manager.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
manager.merkleProofCache = proofResult.get()
@ -306,14 +315,14 @@ suite "Onchain group manager":
check:
validated
asyncTest "validateRoot: should reject bad root":
test "validateRoot: should reject bad root":
let idCredentials = generateCredentials(manager.rlnInstance)
let idCommitment = idCredentials.idCommitment
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
manager.userMessageLimit = some(UserMessageLimit(1))
manager.userMessageLimit = some(UserMessageLimit(20))
manager.membershipIndex = some(MembershipIndex(0))
manager.idCredentials = some(idCredentials)
@ -339,9 +348,9 @@ suite "Onchain group manager":
check:
validated == false
asyncTest "verifyProof: should verify valid proof":
test "verifyProof: should verify valid proof":
let credentials = generateCredentials(manager.rlnInstance)
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
let fut = newFuture[void]()
@ -349,7 +358,7 @@ suite "Onchain group manager":
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].rateCommitment ==
getRateCommitment(credentials, UserMessageLimit(1)).get() and
getRateCommitment(credentials, UserMessageLimit(20)).get() and
registrations[0].index == 0:
manager.idCredentials = some(credentials)
fut.complete()
@ -357,15 +366,15 @@ suite "Onchain group manager":
manager.onRegister(callback)
try:
await manager.register(credentials, UserMessageLimit(1))
waitFor manager.register(credentials, UserMessageLimit(20))
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await fut
waitFor fut
let rootUpdated = await manager.updateRoots()
let rootUpdated = waitFor manager.updateRoots()
if rootUpdated:
let proofResult = await manager.fetchMerkleProofElements()
let proofResult = waitFor manager.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
manager.merkleProofCache = proofResult.get()
@ -388,21 +397,21 @@ suite "Onchain group manager":
check:
verified
asyncTest "verifyProof: should reject invalid proof":
(await manager.init()).isOkOr:
test "verifyProof: should reject invalid proof":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let idCredential = generateCredentials(manager.rlnInstance)
try:
await manager.register(idCredential, UserMessageLimit(1))
waitFor manager.register(idCredential, UserMessageLimit(20))
except Exception, CatchableError:
assert false,
"exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
let messageBytes = "Hello".toBytes()
let rootUpdated = await manager.updateRoots()
let rootUpdated = waitFor manager.updateRoots()
manager.merkleProofCache = newSeq[byte](640)
for i in 0 ..< 640:
@ -427,10 +436,10 @@ suite "Onchain group manager":
check:
verified == false
asyncTest "root queue should be updated correctly":
test "root queue should be updated correctly":
const credentialCount = 12
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
(await manager.init()).isOkOr:
(waitFor manager.init()).isOkOr:
raiseAssert $error
type TestBackfillFuts = array[0 .. credentialCount - 1, Future[void]]
@ -445,7 +454,7 @@ suite "Onchain group manager":
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].rateCommitment ==
getRateCommitment(credentials[futureIndex], UserMessageLimit(1)).get() and
getRateCommitment(credentials[futureIndex], UserMessageLimit(20)).get() and
registrations[0].index == MembershipIndex(futureIndex):
futs[futureIndex].complete()
futureIndex += 1
@ -456,47 +465,40 @@ suite "Onchain group manager":
manager.onRegister(generateCallback(futures, credentials))
for i in 0 ..< credentials.len():
await manager.register(credentials[i], UserMessageLimit(1))
discard await manager.updateRoots()
waitFor manager.register(credentials[i], UserMessageLimit(20))
discard waitFor manager.updateRoots()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
await allFutures(futures)
waitFor allFutures(futures)
check:
manager.validRoots.len() == credentialCount
asyncTest "isReady should return false if ethRpc is none":
(await manager.init()).isOkOr:
test "isReady should return false if ethRpc is none":
(waitFor manager.init()).isOkOr:
raiseAssert $error
manager.ethRpc = none(Web3)
var isReady = true
try:
isReady = await manager.isReady()
isReady = waitFor manager.isReady()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
check:
isReady == false
asyncTest "isReady should return true if ethRpc is ready":
(await manager.init()).isOkOr:
test "isReady should return true if ethRpc is ready":
(waitFor manager.init()).isOkOr:
raiseAssert $error
var isReady = false
try:
isReady = await manager.isReady()
isReady = waitFor manager.isReady()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
check:
isReady == true
################################
## Terminating/removing Anvil
################################
# We stop Anvil daemon
stopAnvil(runAnvil)

View File

@ -30,7 +30,7 @@ import
../testlib/common,
./utils
const CHAIN_ID* = 1337'u256
const CHAIN_ID* = 1234'u256
template skip0xPrefix(hexStr: string): int =
## Returns the index of the first meaningful char in `hexStr` by skipping
@ -61,64 +61,347 @@ proc generateCredentials*(rlnInstance: ptr RLN, n: int): seq[IdentityCredential]
credentials.add(generateCredentials(rlnInstance))
return credentials
# a util function used for testing purposes
# it deploys membership contract on Anvil (or any Eth client available on EthClient address)
# must be edited if used for a different contract than membership contract
# <the difference between this and rln-v1 is that there is no need to deploy the poseidon hasher contract>
proc uploadRLNContract*(ethClientAddress: string): Future[Address] {.async.} =
let web3 = await newWeb3(ethClientAddress)
debug "web3 connected to", ethClientAddress
proc getContractAddressFromDeployScriptOutput(output: string): Result[string, string] =
const searchStr = "Return ==\n0: address "
const addressLength = 42 # Length of an Ethereum address in hex format
let idx = output.find(searchStr)
if idx >= 0:
let startPos = idx + searchStr.len
let endPos = output.find('\n', startPos)
if (endPos - startPos) >= addressLength:
let address = output[startPos ..< endPos]
return ok(address)
return err("Unable to find contract address in deploy script output")
# fetch the list of registered accounts
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[1]
let add = web3.defaultAccount
debug "contract deployer account address ", add
proc getForgePath(): string =
var forgePath = ""
if existsEnv("XDG_CONFIG_HOME"):
forgePath = joinPath(forgePath, os.getEnv("XDG_CONFIG_HOME", ""))
else:
forgePath = joinPath(forgePath, os.getEnv("HOME", ""))
forgePath = joinPath(forgePath, ".foundry/bin/forge")
return $forgePath
let balance =
await web3.provider.eth_getBalance(web3.defaultAccount, blockId("latest"))
debug "Initial account balance: ", balance
contract(ERC20Token):
proc allowance(owner: Address, spender: Address): UInt256 {.view.}
proc balanceOf(account: Address): UInt256 {.view.}
# deploy poseidon hasher bytecode
let poseidonT3Receipt = await web3.deployContract(PoseidonT3)
let poseidonT3Address = poseidonT3Receipt.contractAddress.get()
let poseidonAddressStripped = strip0xPrefix($poseidonT3Address)
proc getTokenBalance(
web3: Web3, tokenAddress: Address, account: Address
): Future[UInt256] {.async.} =
let token = web3.contractSender(ERC20Token, tokenAddress)
return await token.balanceOf(account).call()
# deploy lazy imt bytecode
let lazyImtReceipt = await web3.deployContract(
LazyIMT.replace("__$PoseidonT3$__", poseidonAddressStripped)
)
let lazyImtAddress = lazyImtReceipt.contractAddress.get()
let lazyImtAddressStripped = strip0xPrefix($lazyImtAddress)
proc ethToWei(eth: UInt256): UInt256 =
eth * 1000000000000000000.u256
# deploy waku rlnv2 contract
let wakuRlnContractReceipt = await web3.deployContract(
WakuRlnV2Contract.replace("__$PoseidonT3$__", poseidonAddressStripped).replace(
"__$LazyIMT$__", lazyImtAddressStripped
proc sendMintCall(
web3: Web3,
accountFrom: Address,
tokenAddress: Address,
recipientAddress: Address,
amountTokens: UInt256,
recipientBalanceBeforeExpectedTokens: Option[UInt256] = none(UInt256),
): Future[TxHash] {.async.} =
let doBalanceAssert = recipientBalanceBeforeExpectedTokens.isSome()
if doBalanceAssert:
let balanceBeforeMint = await getTokenBalance(web3, tokenAddress, recipientAddress)
let balanceBeforeExpectedTokens = recipientBalanceBeforeExpectedTokens.get()
assert balanceBeforeMint == balanceBeforeExpectedTokens,
fmt"Balance is {balanceBeforeMint} before minting but expected {balanceBeforeExpectedTokens}"
# Create mint transaction
# Method ID for mint(address,uint256) is 0x40c10f19 which is part of the openzeppelin ERC20 standard
# The method ID for a deployed test token can be viewed here https://sepolia.lineascan.build/address/0x185A0015aC462a0aECb81beCc0497b649a64B9ea#writeContract
let mintSelector = "0x40c10f19"
let addressHex = recipientAddress.toHex()
# Pad the address and amount to 32 bytes each
let paddedAddress = addressHex.align(64, '0')
let amountHex = amountTokens.toHex()
let amountWithout0x =
if amountHex.toLower().startsWith("0x"):
amountHex[2 .. ^1]
else:
amountHex
let paddedAmount = amountWithout0x.align(64, '0')
let mintCallData = mintSelector & paddedAddress & paddedAmount
let gasPrice = int(await web3.provider.eth_gasPrice())
# Create the transaction
var tx: TransactionArgs
tx.`from` = Opt.some(accountFrom)
tx.to = Opt.some(tokenAddress)
tx.value = Opt.some(0.u256) # No ETH is sent for token operations
tx.gasPrice = Opt.some(Quantity(gasPrice))
tx.data = Opt.some(byteutils.hexToSeqByte(mintCallData))
trace "Sending mint call"
let txHash = await web3.send(tx)
let balanceOfSelector = "0x70a08231"
let balanceCallData = balanceOfSelector & paddedAddress
# Wait a bit for transaction to be mined
await sleepAsync(500.milliseconds)
if doBalanceAssert:
let balanceAfterMint = await getTokenBalance(web3, tokenAddress, recipientAddress)
let balanceAfterExpectedTokens =
recipientBalanceBeforeExpectedTokens.get() + amountTokens
assert balanceAfterMint == balanceAfterExpectedTokens,
fmt"Balance is {balanceAfterMint} after transfer but expected {balanceAfterExpectedTokens}"
return txHash
# Check how many tokens a spender (the RLN contract) is allowed to spend on behalf of the owner (account which wishes to register a membership)
proc checkTokenAllowance(
web3: Web3, tokenAddress: Address, owner: Address, spender: Address
): Future[UInt256] {.async.} =
let token = web3.contractSender(ERC20Token, tokenAddress)
let allowance = await token.allowance(owner, spender).call()
trace "Current allowance", owner = owner, spender = spender, allowance = allowance
return allowance
proc setupContractDeployment(
forgePath: string, submodulePath: string
): Result[void, string] =
trace "Contract deployer paths", forgePath = forgePath, submodulePath = submodulePath
# Build the Foundry project
try:
let (forgeCleanOutput, forgeCleanExitCode) =
execCmdEx(fmt"""cd {submodulePath} && {forgePath} clean""")
trace "Executed forge clean command", output = forgeCleanOutput
if forgeCleanExitCode != 0:
return err("forge clean command failed")
let (forgeInstallOutput, forgeInstallExitCode) =
execCmdEx(fmt"""cd {submodulePath} && {forgePath} install""")
trace "Executed forge install command", output = forgeInstallOutput
if forgeInstallExitCode != 0:
return err("forge install command failed")
let (pnpmInstallOutput, pnpmInstallExitCode) =
execCmdEx(fmt"""cd {submodulePath} && pnpm install""")
trace "Executed pnpm install command", output = pnpmInstallOutput
if pnpmInstallExitCode != 0:
return err("pnpm install command failed" & pnpmInstallOutput)
let (forgeBuildOutput, forgeBuildExitCode) =
execCmdEx(fmt"""cd {submodulePath} && {forgePath} build""")
trace "Executed forge build command", output = forgeBuildOutput
if forgeBuildExitCode != 0:
return err("forge build command failed")
# Set the environment variable API keys to anything for local testnet deployment
putEnv("API_KEY_CARDONA", "123")
putEnv("API_KEY_LINEASCAN", "123")
putEnv("API_KEY_ETHERSCAN", "123")
except OSError, IOError:
return err("Command execution failed: " & getCurrentExceptionMsg())
return ok()
proc deployTestToken*(
pk: keys.PrivateKey, acc: Address, web3: Web3
): Future[Result[Address, string]] {.async.} =
## Executes a Foundry forge script that deploys the a token contract (ERC-20) used for testing. This is a prerequisite to enable the contract deployment and this token contract address needs to be minted and approved for the accounts that need to register memberships with the contract
## submodulePath: path to the submodule containing contract deploy scripts
# All RLN related tests should be run from the root directory of the project
let submodulePath = absolutePath("./vendor/waku-rlnv2-contract")
# Verify submodule path exists
if not dirExists(submodulePath):
error "Submodule path does not exist", submodulePath = submodulePath
return err("Submodule path does not exist: " & submodulePath)
let forgePath = getForgePath()
setupContractDeployment(forgePath, submodulePath).isOkOr:
error "Failed to setup contract deployment", error = $error
return err("Failed to setup contract deployment: " & $error)
# Deploy TestToken contract
let forgeCmdTestToken =
fmt"""cd {submodulePath} && {forgePath} script test/TestToken.sol --broadcast -vvv --rpc-url http://localhost:8540 --tc TestTokenFactory --private-key {pk} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json"""
let (outputDeployTestToken, exitCodeDeployTestToken) = execCmdEx(forgeCmdTestToken)
trace "Executed forge command to deploy TestToken contract",
output = outputDeployTestToken
if exitCodeDeployTestToken != 0:
return error("Forge command to deploy TestToken contract failed")
# Parse the command output to find contract address
let testTokenAddress = getContractAddressFromDeployScriptOutput(outputDeployTestToken).valueOr:
error "Failed to get TestToken contract address from deploy script output",
error = $error
return err(
"Failed to get TestToken contract address from deploy script output: " & $error
)
)
let wakuRlnContractAddress = wakuRlnContractReceipt.contractAddress.get()
let wakuRlnAddressStripped = strip0xPrefix($wakuRlnContractAddress)
debug "Address of the TestToken contract", testTokenAddress
debug "Address of the deployed rlnv2 contract: ", wakuRlnContractAddress
let testTokenAddressBytes = hexToByteArray[20](testTokenAddress)
let testTokenAddressAddress = Address(testTokenAddressBytes)
putEnv("TOKEN_ADDRESS", testTokenAddressAddress.toHex())
# need to send concat: impl & init_bytes
let contractInput =
byteutils.toHex(encode(wakuRlnContractAddress)) & Erc1967ProxyContractInput
debug "contractInput", contractInput
let proxyReceipt =
await web3.deployContract(Erc1967Proxy, contractInput = contractInput)
return ok(testTokenAddressAddress)
debug "proxy receipt", contractAddress = proxyReceipt.contractAddress.get()
let proxyAddress = proxyReceipt.contractAddress.get()
# Sends an ERC20 token approval call to allow a spender to spend a certain amount of tokens on behalf of the owner
proc approveTokenAllowanceAndVerify*(
web3: Web3,
accountFrom: Address,
privateKey: keys.PrivateKey,
tokenAddress: Address,
spender: Address,
amountWei: UInt256,
expectedAllowanceBefore: Option[UInt256] = none(UInt256),
): Future[Result[TxHash, string]] {.async.} =
var allowanceBefore: UInt256
if expectedAllowanceBefore.isSome():
allowanceBefore =
await checkTokenAllowance(web3, tokenAddress, accountFrom, spender)
let expected = expectedAllowanceBefore.get()
if allowanceBefore != expected:
return
err(fmt"Allowance is {allowanceBefore} before approval but expected {expected}")
let newBalance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest")
debug "Account balance after the contract deployment: ", newBalance
# Temporarily set the private key
let oldPrivateKey = web3.privateKey
web3.privateKey = Opt.some(privateKey)
web3.lastKnownNonce = Opt.none(Quantity)
try:
# ERC20 approve function signature: approve(address spender, uint256 amount)
# Method ID for approve(address,uint256) is 0x095ea7b3
const APPROVE_SELECTOR = "0x095ea7b3"
let addressHex = spender.toHex().align(64, '0')
let amountHex = amountWei.toHex().align(64, '0')
let approveCallData = APPROVE_SELECTOR & addressHex & amountHex
let gasPrice = await web3.provider.eth_gasPrice()
var tx: TransactionArgs
tx.`from` = Opt.some(accountFrom)
tx.to = Opt.some(tokenAddress)
tx.value = Opt.some(0.u256)
tx.gasPrice = Opt.some(gasPrice)
tx.gas = Opt.some(Quantity(100000))
tx.data = Opt.some(byteutils.hexToSeqByte(approveCallData))
tx.chainId = Opt.some(CHAIN_ID)
trace "Sending approve call", tx = tx
let txHash = await web3.send(tx)
let receipt = await web3.getMinedTransactionReceipt(txHash)
if receipt.status.isNone():
return err("Approval transaction failed receipt is none")
if receipt.status.get() != 1.Quantity:
return err("Approval transaction failed status quantity not 1")
# Single verification check after mining (no extra sleep needed)
let allowanceAfter =
await checkTokenAllowance(web3, tokenAddress, accountFrom, spender)
let expectedAfter =
if expectedAllowanceBefore.isSome():
expectedAllowanceBefore.get() + amountWei
else:
amountWei
if allowanceAfter < expectedAfter:
return err(
fmt"Allowance is {allowanceAfter} after approval but expected at least {expectedAfter}"
)
return ok(txHash)
except CatchableError as e:
return err(fmt"Failed to send approve transaction: {e.msg}")
finally:
# Restore the old private key
web3.privateKey = oldPrivateKey
proc executeForgeContractDeployScripts*(
privateKey: keys.PrivateKey, acc: Address, web3: Web3
): Future[Result[Address, string]] {.async, gcsafe.} =
## Executes a set of foundry forge scripts required to deploy the RLN contract and returns the deployed proxy contract address
## submodulePath: path to the submodule containing contract deploy scripts
# All RLN related tests should be run from the root directory of the project
let submodulePath = "./vendor/waku-rlnv2-contract"
# Verify submodule path exists
if not dirExists(submodulePath):
error "Submodule path does not exist", submodulePath = submodulePath
return err("Submodule path does not exist: " & submodulePath)
let forgePath = getForgePath()
debug "Forge path", forgePath
# Verify forge executable exists
if not fileExists(forgePath):
error "Forge executable not found", forgePath = forgePath
return err("Forge executable not found: " & forgePath)
trace "contract deployer account details", account = acc, privateKey = privateKey
let setupContractEnv = setupContractDeployment(forgePath, submodulePath)
if setupContractEnv.isErr():
error "Failed to setup contract deployment"
return err("Failed to setup contract deployment")
# Deploy LinearPriceCalculator contract
let forgeCmdPriceCalculator =
fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployPriceCalculator --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json"""
let (outputDeployPriceCalculator, exitCodeDeployPriceCalculator) =
execCmdEx(forgeCmdPriceCalculator)
trace "Executed forge command to deploy LinearPriceCalculator contract",
output = outputDeployPriceCalculator
if exitCodeDeployPriceCalculator != 0:
return error("Forge command to deploy LinearPriceCalculator contract failed")
# Parse the output to find contract address
let priceCalculatorAddressRes =
getContractAddressFromDeployScriptOutput(outputDeployPriceCalculator)
if priceCalculatorAddressRes.isErr():
error "Failed to get LinearPriceCalculator contract address from deploy script output"
let priceCalculatorAddress = priceCalculatorAddressRes.get()
debug "Address of the LinearPriceCalculator contract", priceCalculatorAddress
putEnv("PRICE_CALCULATOR_ADDRESS", priceCalculatorAddress)
let forgeCmdWakuRln =
fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployWakuRlnV2 --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json"""
let (outputDeployWakuRln, exitCodeDeployWakuRln) = execCmdEx(forgeCmdWakuRln)
trace "Executed forge command to deploy WakuRlnV2 contract",
output = outputDeployWakuRln
if exitCodeDeployWakuRln != 0:
error "Forge command to deploy WakuRlnV2 contract failed",
output = outputDeployWakuRln
# Parse the output to find contract address
let wakuRlnV2AddressRes =
getContractAddressFromDeployScriptOutput(outputDeployWakuRln)
if wakuRlnV2AddressRes.isErr():
error "Failed to get WakuRlnV2 contract address from deploy script output"
##TODO: raise exception here?
let wakuRlnV2Address = wakuRlnV2AddressRes.get()
debug "Address of the WakuRlnV2 contract", wakuRlnV2Address
putEnv("WAKURLNV2_ADDRESS", wakuRlnV2Address)
# Deploy Proxy contract
let forgeCmdProxy =
fmt"""cd {submodulePath} && {forgePath} script script/Deploy.s.sol --broadcast -vvvv --rpc-url http://localhost:8540 --tc DeployProxy --private-key {privateKey} && rm -rf broadcast/*/*/run-1*.json && rm -rf cache/*/*/run-1*.json"""
let (outputDeployProxy, exitCodeDeployProxy) = execCmdEx(forgeCmdProxy)
trace "Executed forge command to deploy proxy contract", output = outputDeployProxy
if exitCodeDeployProxy != 0:
error "Forge command to deploy Proxy failed", error = outputDeployProxy
return err("Forge command to deploy Proxy failed")
let proxyAddress = getContractAddressFromDeployScriptOutput(outputDeployProxy)
let proxyAddressBytes = hexToByteArray[20](proxyAddress.get())
let proxyAddressAddress = Address(proxyAddressBytes)
info "Address of the Proxy contract", proxyAddressAddress
await web3.close()
debug "disconnected from ", ethClientAddress
return proxyAddress
return ok(proxyAddressAddress)
proc sendEthTransfer*(
web3: Web3,
@ -133,7 +416,7 @@ proc sendEthTransfer*(
let balanceBeforeWei = await web3.provider.eth_getBalance(accountTo, "latest")
let balanceBeforeExpectedWei = accountToBalanceBeforeExpectedWei.get()
assert balanceBeforeWei == balanceBeforeExpectedWei,
fmt"Balance is {balanceBeforeWei} but expected {balanceBeforeExpectedWei}"
fmt"Balance is {balanceBeforeWei} before transfer but expected {balanceBeforeExpectedWei}"
let gasPrice = int(await web3.provider.eth_gasPrice())
@ -146,17 +429,17 @@ proc sendEthTransfer*(
# TODO: handle the error if sending fails
let txHash = await web3.send(tx)
# Wait a bit for transaction to be mined
await sleepAsync(200.milliseconds)
if doBalanceAssert:
let balanceAfterWei = await web3.provider.eth_getBalance(accountTo, "latest")
let balanceAfterExpectedWei = accountToBalanceBeforeExpectedWei.get() + amountWei
assert balanceAfterWei == balanceAfterExpectedWei,
fmt"Balance is {balanceAfterWei} but expected {balanceAfterExpectedWei}"
fmt"Balance is {balanceAfterWei} after transfer but expected {balanceAfterExpectedWei}"
return txHash
proc ethToWei(eth: UInt256): UInt256 =
eth * 1000000000000000000.u256
proc createEthAccount*(
ethAmount: UInt256 = 1000.u256
): Future[(keys.PrivateKey, Address)] {.async.} =
@ -198,7 +481,7 @@ proc getAnvilPath*(): string =
return $anvilPath
# Runs Anvil daemon
proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process =
proc runAnvil*(port: int = 8540, chainId: string = "1234"): Process =
# Passed options are
# --port Port to listen on.
# --gas-limit Sets the block gas limit in WEI.
@ -212,13 +495,13 @@ proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process =
anvilPath,
args = [
"--port",
"8540",
$port,
"--gas-limit",
"300000000000000",
"--balance",
"1000000000",
"--chain-id",
$CHAIN_ID,
$chainId,
],
options = {poUsePath},
)
@ -242,14 +525,26 @@ proc runAnvil*(port: int = 8540, chainId: string = "1337"): Process =
# Stops Anvil daemon
proc stopAnvil*(runAnvil: Process) {.used.} =
if runAnvil.isNil:
debug "stopAnvil called with nil Process"
return
let anvilPID = runAnvil.processID
# We wait the daemon to exit
debug "Stopping Anvil daemon", anvilPID = anvilPID
try:
# We terminate Anvil daemon by sending a SIGTERM signal to the runAnvil PID to trigger RPC server termination and clean-up
kill(runAnvil)
debug "Sent SIGTERM to Anvil", anvilPID = anvilPID
except:
error "Anvil daemon termination failed: ", err = getCurrentExceptionMsg()
# Send termination signals
when not defined(windows):
discard execCmdEx(fmt"kill -TERM {anvilPID}")
discard execCmdEx(fmt"kill -9 {anvilPID}")
else:
discard execCmdEx(fmt"taskkill /F /PID {anvilPID}")
# Close Process object to release resources
close(runAnvil)
debug "Anvil daemon stopped", anvilPID = anvilPID
except Exception as e:
debug "Error stopping Anvil daemon", anvilPID = anvilPID, error = e.msg
proc setupOnchainGroupManager*(
ethClientUrl: string = EthClient, amountEth: UInt256 = 10.u256
@ -261,12 +556,10 @@ proc setupOnchainGroupManager*(
let rlnInstance = rlnInstanceRes.get()
let contractAddress = await uploadRLNContract(ethClientUrl)
# connect to the eth client
let web3 = await newWeb3(ethClientUrl)
let accounts = await web3.provider.eth_accounts()
web3.defaultAccount = accounts[0]
web3.defaultAccount = accounts[1]
let (privateKey, acc) = createEthAccount(web3)
@ -276,6 +569,32 @@ proc setupOnchainGroupManager*(
web3, web3.defaultAccount, acc, ethToWei(1000.u256), some(0.u256)
)
let testTokenAddress = (await deployTestToken(privateKey, acc, web3)).valueOr:
assert false, "Failed to deploy test token contract: " & $error
return
# mint the token from the generated account
discard await sendMintCall(
web3, web3.defaultAccount, testTokenAddress, acc, ethToWei(1000.u256), some(0.u256)
)
let contractAddress = (await executeForgeContractDeployScripts(privateKey, acc, web3)).valueOr:
assert false, "Failed to deploy RLN contract: " & $error
return
# If the generated account wishes to register a membership, it needs to approve the contract to spend its tokens
let tokenApprovalResult = await approveTokenAllowanceAndVerify(
web3,
acc, # owner
privateKey,
testTokenAddress, # ERC20 token address
contractAddress, # spender - the proxy contract that will spend the tokens
ethToWei(200.u256),
some(0.u256), # expected allowance before approval
)
assert tokenApprovalResult.isOk, tokenApprovalResult.error()
let manager = OnchainGroupManager(
ethClientUrls: @[ethClientUrl],
ethContractAddress: $contractAddress,

View File

@ -26,6 +26,7 @@ proc newTestWakuRecon*(
wantsTx: AsyncQueue[PeerId],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
cluster: uint16 = 1,
syncRange: timer.Duration = DefaultSyncRange,
shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7],
): Future[SyncReconciliation] {.async.} =
let peerManager = PeerManager.new(switch)
@ -36,6 +37,7 @@ proc newTestWakuRecon*(
peerManager = peerManager,
wakuArchive = nil,
relayJitter = 0.seconds,
syncRange = syncRange,
idsRx = idsRx,
localWantsTx = wantsTx,
remoteNeedsTx = needsTx,

View File

@ -1,8 +1,12 @@
{.used.}
import
std/[options, sets, random, math], testutils/unittests, chronos, libp2p/crypto/crypto
std/[options, sets, random, math, algorithm],
testutils/unittests,
chronos,
libp2p/crypto/crypto
import chronos, chronos/asyncsync
import nimcrypto
import
../../waku/[
node/peer_manager,
@ -21,6 +25,15 @@ import
../waku_archive/archive_utils,
./sync_utils
proc collectDiffs*(
chan: var Channel[SyncID], diffCount: int
): HashSet[WakuMessageHash] =
var received: HashSet[WakuMessageHash]
while received.len < diffCount:
let sid = chan.recv() # synchronous receive
received.incl sid.hash
result = received
suite "Waku Sync: reconciliation":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
@ -234,53 +247,376 @@ suite "Waku Sync: reconciliation":
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true
asyncTest "sync 2 nodes 10K msgs 1K diffs":
let msgCount = 10_000
var diffCount = 1_000
const
msgCount = 200_000 # total messages on the server
diffCount = 100 # messages initially missing on the client
var diffMsgHashes: HashSet[WakuMessageHash]
var randIndexes: HashSet[int]
## ── choose which messages will be absent from the client ─────────────
var missingIdx: HashSet[int]
while missingIdx.len < diffCount:
missingIdx.incl rand(0 ..< msgCount)
# Diffs
for i in 0 ..< diffCount:
var randInt = rand(0 ..< msgCount)
#make sure we actually have the right number of diffs
while randInt in randIndexes:
randInt = rand(0 ..< msgCount)
randIndexes.incl(randInt)
# sync window is 1 hour, spread msg equally in that time
let timeSlice = calculateTimeRange()
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
let (part, _) = divmod(timeWindow, 100_000)
var timestamp = timeSlice.a
## ── generate messages and pre-load the two reconcilers ───────────────
let slice = calculateTimeRange() # 1-hour window
let step = (int64(slice.b) - int64(slice.a)) div msgCount
var ts = slice.a
for i in 0 ..< msgCount:
let
msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic)
hash = computeMessageHash(DefaultPubsubTopic, msg)
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(h, msg) # every msg is on the server
if i notin missingIdx:
client.messageIngress(h, msg) # all but 100 are on the client
ts += Timestamp(step)
if i in randIndexes:
diffMsgHashes.incl(hash)
## ── sanity before we start the round ─────────────────────────────────
check remoteNeeds.len == 0
## ── launch reconciliation from the client towards the server ─────────
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
## ── verify that ≈100 diffs were queued (allow 10 % slack) ────────────
check remoteNeeds.len >= 90 # ≈ 100 × 0.9
asyncTest "sync 2 nodes 400K msgs 100k diffs":
const
msgCount = 400_000
diffCount = 100_000
tol = 1000
var diffMsgHashes: HashSet[WakuMessageHash]
var missingIdx: HashSet[int]
while missingIdx.len < diffCount:
missingIdx.incl rand(0 ..< msgCount)
let slice = calculateTimeRange()
let step = (int64(slice.b) - int64(slice.a)) div msgCount
var ts = slice.a
for i in 0 ..< msgCount:
let
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(h, msg)
if i notin missingIdx:
client.messageIngress(h, msg)
else:
client.messageIngress(hash, msg)
diffMsgHashes.incl h
timestamp += Timestamp(part)
continue
ts += Timestamp(step)
check:
remoteNeeds.len == 0
check remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
# timimg issue make it hard to match exact numbers
check:
remoteNeeds.len > 900
check remoteNeeds.len >= diffCount - tol and remoteNeeds.len < diffCount
let (_, deliveredHash) = await remoteNeeds.get()
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes 100 msgs 20 diff 1-second window":
const
msgCount = 100
diffCount = 20
var missingIdx: seq[int] = @[]
while missingIdx.len < diffCount:
let n = rand(0 ..< msgCount)
if n notin missingIdx:
missingIdx.add n
var diffMsgHashes: HashSet[WakuMessageHash]
let sliceEnd = now()
let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64
let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount
var ts = sliceStart
for i in 0 ..< msgCount:
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
if i in missingIdx:
diffMsgHashes.incl hash
else:
client.messageIngress(hash, msg)
ts += Timestamp(step)
check remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == diffCount
for _ in 0 ..< diffCount:
let (_, deliveredHash) = await remoteNeeds.get()
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes 500k msgs 300k diff stress window":
const
msgCount = 500_000
diffCount = 300_000
randomize()
var allIdx = newSeq[int](msgCount)
for i in 0 ..< msgCount:
allIdx[i] = i
shuffle(allIdx)
let missingIdx = allIdx[0 ..< diffCount]
var missingSet: HashSet[int]
for idx in missingIdx:
missingSet.incl idx
var diffMsgHashes: HashSet[WakuMessageHash]
let sliceEnd = now()
let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64
let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount
var ts = sliceStart
for i in 0 ..< msgCount:
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
if i in missingSet:
diffMsgHashes.incl hash
else:
client.messageIngress(hash, msg)
ts += Timestamp(step)
check remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == diffCount
for _ in 0 ..< 1000:
let (_, deliveredHash) = await remoteNeeds.get()
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes, 40 msgs: 20 in-window diff, 20 out-window ignored":
const
diffInWin = 20
diffOutWin = 20
stepOutNs = 100_000_000'u64
outOffsetNs = 2_300_000_000'u64 # for 20 mesg they sent 2 seconds earlier
randomize()
let nowNs = getNowInNanosecondTime()
let sliceStart = Timestamp(uint64(nowNs) - 700_000_000'u64)
let sliceEnd = nowNs
let stepIn = (sliceEnd.int64 - sliceStart.int64) div diffInWin
let oldStart = Timestamp(uint64(sliceStart) - outOffsetNs)
let stepOut = Timestamp(stepOutNs)
var inWinHashes, outWinHashes: HashSet[WakuMessageHash]
var ts = sliceStart
for _ in 0 ..< diffInWin:
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
inWinHashes.incl hash
ts += Timestamp(stepIn)
ts = oldStart
for _ in 0 ..< diffOutWin:
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
outWinHashes.incl hash
ts += Timestamp(stepOut)
check remoteNeeds.len == 0
let oneSec = timer.seconds(1)
server = await newTestWakuRecon(
serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
)
client = await newTestWakuRecon(
clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == diffInWin
for _ in 0 ..< diffInWin:
let (_, deliveredHashes) = await remoteNeeds.get()
check deliveredHashes in inWinHashes
check deliveredHashes notin outWinHashes
asyncTest "hash-fingerprint collision, same timestamp stable sort":
let ts = Timestamp(getNowInNanosecondTime())
var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
var msg2 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
msg2.payload[0] = msg2.payload[0] xor 0x01
var h1 = computeMessageHash(DefaultPubsubTopic, msg1)
var h2 = computeMessageHash(DefaultPubsubTopic, msg2)
for i in 0 ..< 8:
h2[i] = h1[i]
for i in 0 ..< 8:
check h1[i] == h2[i]
check h1 != h2
server.messageIngress(h1, msg1)
client.messageIngress(h2, msg2)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == 1
var vec = @[SyncID(time: ts, hash: h2), SyncID(time: ts, hash: h1)]
vec.shuffle()
vec.sort()
let hFirst = vec[0].hash
let hSecond = vec[1].hash
check vec[0].time == ts and vec[1].time == ts
asyncTest "malformed message-ID is ignored during reconciliation":
let nowTs = Timestamp(getNowInNanosecondTime())
let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic)
var goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg)
var badHash: WakuMessageHash
for i in 0 ..< 32:
badHash[i] = 0'u8
let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic)
server.messageIngress(goodHash, goodMsg)
server.messageIngress(badHash, badMsg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == 1
let (_, neededHash) = await remoteNeeds.get()
check neededHash == goodHash
check neededHash != badHash
asyncTest "malformed ID: future-timestamp msg is ignored":
let nowNs = getNowInNanosecondTime()
let tsNow = Timestamp(nowNs)
let goodMsg = fakeWakuMessage(ts = tsNow, contentTopic = DefaultContentTopic)
let goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg)
const tenYearsSec = 10 * 365 * 24 * 60 * 60
let futureNs = nowNs + int64(tenYearsSec) * 1_000_000_000'i64
let badTs = Timestamp(futureNs.uint64)
let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic)
let badHash = computeMessageHash(DefaultPubsubTopic, badMsg)
server.messageIngress(goodHash, goodMsg)
server.messageIngress(badHash, badMsg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == 1
let (_, neededHash) = await remoteNeeds.get()
check neededHash == goodHash
check neededHash != badHash
asyncTest "duplicate ID is queued only once":
let ts = Timestamp(getNowInNanosecondTime())
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(h, msg)
server.messageIngress(h, msg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check remoteNeeds.len == 1
let (_, neededHash) = await remoteNeeds.get()
check neededHash == h
asyncTest "sync terminates immediately when no diffs exist":
let ts = Timestamp(getNowInNanosecondTime())
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
client.messageIngress(hash, msg)
let idsQ = newAsyncQueue[SyncID]()
let wantsQ = newAsyncQueue[PeerId]()
let needsQ = newAsyncQueue[(PeerId, Fingerprint)]()
server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ)
client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ)
defer:
server.stop()
client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check needsQ.len == 0
suite "Waku Sync: transfer":
var
@ -396,3 +732,40 @@ suite "Waku Sync: transfer":
check:
response.messages.len > 0
asyncTest "Check the exact missing messages are received":
let timeSlice = calculateTimeRange()
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
let (part, _) = divmod(timeWindow, 3)
var ts = timeSlice.a
let msgA = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
ts += Timestamp(part)
let msgB = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
ts += Timestamp(part)
let msgC = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hA = computeMessageHash(DefaultPubsubTopic, msgA)
let hB = computeMessageHash(DefaultPubsubTopic, msgB)
let hC = computeMessageHash(DefaultPubsubTopic, msgC)
discard serverDriver.put(DefaultPubsubTopic, @[msgA, msgB, msgC])
discard clientDriver.put(DefaultPubsubTopic, @[msgA])
await serverRemoteNeeds.put((clientPeerInfo.peerId, hB))
await serverRemoteNeeds.put((clientPeerInfo.peerId, hC))
await clientLocalWants.put(serverPeerInfo.peerId)
await sleepAsync(1.seconds)
check serverRemoteNeeds.len == 0
let sid1 = await clientIds.get()
let sid2 = await clientIds.get()
let received = [sid1.hash, sid2.hash].toHashSet()
let expected = [hB, hC].toHashSet
check received == expected
check clientIds.len == 0

View File

@ -0,0 +1,242 @@
import unittest, nimcrypto, std/sequtils, results
import ../../waku/waku_store_sync/[reconciliation, common]
import ../../waku/waku_store_sync/storage/seq_storage
import ../../waku/waku_core/message/digest
proc toDigest(s: string): WakuMessageHash =
let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, (s.len - 1)))
var res: WakuMessageHash
for i in 0 .. 31:
res[i] = d.data[i]
return res
proc `..`(a, b: SyncID): Slice[SyncID] =
Slice[SyncID](a: a, b: b)
suite "Waku Sync reconciliation":
test "fan-out: eight fingerprint sub-ranges for large slice":
const N = 2_048
const mismatchI = 70
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var baseHashMismatch: WakuMessageHash
var remoteHashMismatch: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
if i == mismatchI:
baseHashMismatch = hashLocal
remoteHashMismatch = toDigest("msg" & $i & "_x")
hashRemote = remoteHashMismatch
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
assert false, "failed to insert hash: " & $error
var z: WakuMessageHash
let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
check local.computeFingerprint(whole) != remote.computeFingerprint(whole)
let remoteFp = remote.computeFingerprint(whole)
let payload = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(whole, RangeType.Fingerprint)],
fingerprints: @[remoteFp],
itemSets: @[],
)
var toSend, toRecv: seq[WakuMessageHash]
let reply = local.processPayload(payload, toSend, toRecv)
check reply.ranges.len == 8
check reply.ranges.allIt(it[1] == RangeType.Fingerprint)
check reply.itemSets.len == 0
check reply.fingerprints.len == 8
let mismTime = 1000 + mismatchI
var covered = false
for (slc, _) in reply.ranges:
if mismTime >= slc.a.time and mismTime <= slc.b.time:
covered = true
break
check covered
check toSend.len == 0
check toRecv.len == 0
test "splits mismatched fingerprint into two sub-ranges then item-set":
const threshold = 4
const partitions = 2
let local = SeqStorage.new(@[], threshold = threshold, partitions = partitions)
let remote = SeqStorage.new(@[], threshold = threshold, partitions = partitions)
var mismatchHash: WakuMessageHash
for i in 0 ..< 8:
let t = 1000 + i
let baseHash = toDigest("msg" & $i)
var localHash = baseHash
var remoteHash = baseHash
if i == 3:
mismatchHash = toDigest("msg" & $i & "_x")
localHash = mismatchHash
discard local.insert (SyncID(time: t, hash: localHash))
discard remote.insert(SyncID(time: t, hash: remoteHash))
var zeroHash: WakuMessageHash
let wholeRange =
SyncID(time: 1000, hash: zeroHash) .. SyncID(time: 1007, hash: zeroHash)
var toSend, toRecv: seq[WakuMessageHash]
let payload = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(wholeRange, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(wholeRange)],
itemSets: @[],
)
let reply = local.processPayload(payload, toSend, toRecv)
check reply.ranges.len == partitions
check reply.itemSets.len == partitions
check reply.itemSets.anyIt(
it.elements.anyIt(it.hash == mismatchHash and it.time == 1003)
)
test "second round when N =2048 & local ":
const N = 2_048
const mismatchI = 70
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var baseHashMismatch, remoteHashMismatch: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
if i == mismatchI:
baseHashMismatch = hashLocal
remoteHashMismatch = toDigest("msg" & $i & "_x")
hashRemote = remoteHashMismatch
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
assert false, "failed to insert hash: " & $error
var zero: WakuMessageHash
let sliceWhole =
SyncID(time: 1000, hash: zero) .. SyncID(time: 1000 + N - 1, hash: zero)
check local.computeFingerprint(sliceWhole) != remote.computeFingerprint(sliceWhole)
let payload1 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(sliceWhole, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sliceWhole)],
itemSets: @[],
)
var toSend, toRecv: seq[WakuMessageHash]
let reply1 = local.processPayload(payload1, toSend, toRecv)
check reply1.ranges.len == 8
check reply1.ranges.allIt(it[1] == RangeType.Fingerprint)
let mismTime = 1000 + mismatchI
var subSlice: Slice[SyncID]
for (sl, _) in reply1.ranges:
if mismTime >= sl.a.time and mismTime <= sl.b.time:
subSlice = sl
break
check subSlice.a.time != 0
let payload2 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(subSlice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(subSlice)],
itemSets: @[],
)
var toSend2, toRecv2: seq[WakuMessageHash]
let reply2 = local.processPayload(payload2, toSend2, toRecv2)
check reply2.ranges.len == 8
check reply2.ranges.allIt(it[1] == RangeType.ItemSet)
check reply2.itemSets.len == 8
var matchCount = 0
for iset in reply2.itemSets:
if iset.elements.anyIt(it.time == mismTime and it.hash == baseHashMismatch):
inc matchCount
check not iset.elements.anyIt(it.hash == remoteHashMismatch)
check matchCount == 1
check toSend2.len == 0
check toRecv2.len == 0
test "second-round payload remote":
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var baseHash: WakuMessageHash
var alteredHash: WakuMessageHash
for i in 0 ..< 8:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
if i == 3:
baseHash = hashLocal
alteredHash = toDigest("msg" & $i & "_x")
hashRemote = alteredHash
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
assert false, "failed to insert hash: " & $error
var zero: WakuMessageHash
let slice = SyncID(time: 1000, hash: zero) .. SyncID(time: 1007, hash: zero)
check local.computeFingerprint(slice) != remote.computeFingerprint(slice)
var toSend1, toRecv1: seq[WakuMessageHash]
let pay1 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(slice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(slice)],
itemSets: @[],
)
let rep1 = local.processPayload(pay1, toSend1, toRecv1)
check rep1.ranges.len == 1
check rep1.ranges[0][1] == RangeType.ItemSet
check toSend1.len == 0
check toRecv1.len == 0
var toSend2, toRecv2: seq[WakuMessageHash]
discard remote.processPayload(rep1, toSend2, toRecv2)
check toSend2.len == 1
check toSend2[0] == alteredHash
check toRecv2.len == 1
check toRecv2[0] == baseHash

View File

@ -0,0 +1,244 @@
import unittest, nimcrypto, std/sequtils
import ../../waku/waku_store_sync/[reconciliation, common]
import ../../waku/waku_store_sync/storage/seq_storage
import ../../waku/waku_core/message/digest
proc toDigest*(s: string): WakuMessageHash =
let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high))
for i in 0 .. 31:
result[i] = d.data[i]
proc `..`(a, b: SyncID): Slice[SyncID] =
Slice[SyncID](a: a, b: b)
suite "Waku Sync reconciliation":
test "Fingerprint → ItemSet → zero (default thresholds)":
const N = 2_000
const idx = 137
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var baseH, altH: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
var hr = h
if i == idx:
baseH = h
altH = toDigest("msg" & $i & "x")
hr = altH
discard remote.insert(SyncID(time: ts, hash: hr))
var z: WakuMessageHash
let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
var s1, r1: seq[WakuMessageHash]
let p1 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(whole, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(whole)],
itemSets: @[],
)
let rep1 = local.processPayload(p1, s1, r1)
check rep1.ranges.len == 8
check rep1.ranges.allIt(it[1] == RangeType.Fingerprint)
let mismT = 1000 + idx
let sub =
rep1.ranges.filterIt(mismT >= it[0].a.time and mismT <= it[0].b.time)[0][0]
var s2, r2: seq[WakuMessageHash]
let p2 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(sub, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sub)],
itemSets: @[],
)
let rep2 = local.processPayload(p2, s2, r2)
check rep2.ranges.len == 8
check rep2.ranges.allIt(it[1] == RangeType.ItemSet)
var s3, r3: seq[WakuMessageHash]
discard remote.processPayload(rep2, s3, r3)
check s3.len == 1 and s3[0] == altH
check r3.len == 1 and r3[0] == baseH
discard local.insert(SyncID(time: mismT, hash: altH))
discard remote.insert(SyncID(time: mismT, hash: baseH))
var s4, r4: seq[WakuMessageHash]
let p3 = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(sub, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sub)],
itemSets: @[],
)
let rep3 = local.processPayload(p3, s4, r4)
check rep3.ranges.len == 0
check s4.len == 0 and r4.len == 0
test "test 2 ranges includes 1 skip":
const N = 120
const pivot = 60
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var diffHash: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
var hr: WakuMessageHash
if i >= pivot:
diffHash = toDigest("msg" & $i & "_x")
hr = diffHash
else:
hr = h
discard remote.insert(SyncID(time: ts, hash: hr))
var z: WakuMessageHash
let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z)
let sliceB = SyncID(time: 1060, hash: z) .. SyncID(time: 1119, hash: z)
var s, r: seq[WakuMessageHash]
let payload = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)],
fingerprints:
@[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)],
itemSets: @[],
)
let reply = local.processPayload(payload, s, r)
check reply.ranges.len == 2
check reply.ranges[0][1] == RangeType.Skip
check reply.ranges[1][1] == RangeType.ItemSet
check reply.itemSets.len == 1
check not reply.itemSets[0].elements.anyIt(it.hash == diffHash)
test "custom threshold (50) → eight ItemSets first round":
const N = 300
const idx = 123
let local = SeqStorage.new(capacity = N, threshold = 50, partitions = 8)
let remote = SeqStorage.new(capacity = N, threshold = 50, partitions = 8)
var baseH, altH: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
var hr = h
if i == idx:
baseH = h
altH = toDigest("msg" & $i & "_x")
hr = altH
discard remote.insert(SyncID(time: ts, hash: hr))
var z: WakuMessageHash
let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
var toS, toR: seq[WakuMessageHash]
let p = RangesData(
cluster: 0,
shards: @[0],
ranges: @[(slice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(slice)],
itemSets: @[],
)
let reply = local.processPayload(p, toS, toR)
check reply.ranges.len == 8
check reply.ranges.allIt(it[1] == RangeType.ItemSet)
check reply.itemSets.len == 8
let mismT = 1000 + idx
var hit = 0
for ist in reply.itemSets:
if ist.elements.anyIt(it.time == mismT and it.hash == baseH):
inc hit
check hit == 1
test "test N=80K,3FP,2IS,SKIP":
const N = 80_000
const bad = N - 10
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
var baseH, altH: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
let hr =
if i == bad:
baseH = h
altH = toDigest("msg" & $i & "_x")
altH
else:
h
discard remote.insert(SyncID(time: ts, hash: hr))
var slice =
SyncID(time: 1000, hash: EmptyFingerprint) ..
SyncID(time: 1000 + N - 1, hash: FullFingerprint)
proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData =
local.processPayload(
RangesData(
cluster: 0,
shards: @[0],
ranges: @[(s, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(s)],
itemSets: @[],
),
sendQ,
recvQ,
)
var tmpS, tmpR: seq[WakuMessageHash]
for r in 1 .. 3:
let rep = fpReply(slice, tmpS, tmpR)
check rep.ranges.len == 8
check rep.ranges.allIt(it[1] == RangeType.Fingerprint)
for (sl, _) in rep.ranges:
if local.computeFingerprint(sl) != remote.computeFingerprint(sl):
slice = sl
break
let rep4 = fpReply(slice, tmpS, tmpR)
check rep4.ranges.len == 8
check rep4.ranges.allIt(it[1] == RangeType.ItemSet)
for (sl, _) in rep4.ranges:
if sl.a.time <= 1000 + bad and sl.b.time >= 1000 + bad:
slice = sl
break
var send5, recv5: seq[WakuMessageHash]
let rep5 = fpReply(slice, send5, recv5)
check rep5.ranges.len == 1
check rep5.ranges[0][1] == RangeType.ItemSet
var qSend, qRecv: seq[WakuMessageHash]
discard remote.processPayload(rep5, qSend, qRecv)
check qSend.len == 1 and qSend[0] == altH
check qRecv.len == 1 and qRecv[0] == baseH
discard local.insert(SyncID(time: slice.a.time, hash: altH))
discard remote.insert(SyncID(time: slice.a.time, hash: baseH))
var send6, recv6: seq[WakuMessageHash]
let rep6 = fpReply(slice, send6, recv6)
check rep6.ranges.len == 0
check send6.len == 0 and recv6.len == 0

@ -1 +1 @@
Subproject commit a576a8949ca20e310f2fbb4ec0bd05a57ac3045f
Subproject commit b7e9a9b1bc69256a2a3076c1f099b50ce84e7eff

View File

@ -1,3 +1,4 @@
import os
mode = ScriptMode.Verbose
### Package
@ -69,9 +70,15 @@ proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " &
extra_params & " " & srcDir & name & ".nim"
else:
exec "nim c" & " --out:build/" & name &
".so --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " &
extra_params & " " & srcDir & name & ".nim"
var lib_name = toDll("libwaku")
when defined(windows):
exec "nim c" & " --out:build/" & lib_name &
" --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:off -d:discv5_protocol_id=d5waku " &
extra_params & " " & srcDir & name & ".nim"
else:
exec "nim c" & " --out:build/" & lib_name &
" --threads:on --app:lib --opt:size --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:on -d:discv5_protocol_id=d5waku " &
extra_params & " " & srcDir & name & ".nim"
proc buildMobileAndroid(srcDir = ".", params = "") =
let cpu = getEnv("CPU")
@ -160,33 +167,20 @@ task testone, "Test custom target":
exec "build/" & filepath & ".bin"
### C Bindings
let chroniclesParams =
"-d:chronicles_line_numbers " & "-d:chronicles_runtime_filtering=on " &
"""-d:chronicles_sinks="textlines,json" """ &
"-d:chronicles_default_output_device=Dynamic " &
"""-d:chronicles_disabled_topics="eth,dnsdisc.client" """ & "--warning:Deprecated:off " &
"--warning:UnusedImport:on " & "-d:chronicles_log_level=TRACE"
task libwakuStatic, "Build the cbindings waku node library":
let name = "libwaku"
buildLibrary name,
"library/",
"""-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"static"
buildLibrary name, "library/", chroniclesParams, "static"
task libwakuDynamic, "Build the cbindings waku node library":
let name = "libwaku"
buildLibrary name,
"library/",
"""-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"dynamic"
buildLibrary name, "library/", chroniclesParams, "dynamic"
### Mobile Android
task libWakuAndroid, "Build the mobile bindings for Android":

View File

@ -38,6 +38,9 @@ proc withTableIpLimit*(b: var Discv5ConfBuilder, tableIpLimit: uint) =
proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: Port) =
b.udpPort = some(udpPort)
proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: uint) =
b.udpPort = some(Port(udpPort.uint16))
proc withBootstrapNodes*(b: var Discv5ConfBuilder, bootstrapNodes: seq[string]) =
# TODO: validate ENRs?
b.bootstrapNodes = concat(b.bootstrapNodes, bootstrapNodes)

View File

@ -21,6 +21,9 @@ proc withEnabled*(b: var DnsDiscoveryConfBuilder, enabled: bool) =
proc withEnrTreeUrl*(b: var DnsDiscoveryConfBuilder, enrTreeUrl: string) =
b.enrTreeUrl = some(enrTreeUrl)
proc withNameServers*(b: var DnsDiscoveryConfBuilder, nameServers: seq[IpAddress]) =
b.nameServers = nameServers
proc build*(b: DnsDiscoveryConfBuilder): Result[Option[DnsDiscoveryConf], string] =
if not b.enabled.get(false):
return ok(none(DnsDiscoveryConf))

View File

@ -1005,6 +1005,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.dnsDiscoveryConf.withEnabled(n.dnsDiscovery)
b.dnsDiscoveryConf.withEnrTreeUrl(n.dnsDiscoveryUrl)
b.dnsDiscoveryConf.withNameServers(n.dnsAddrsNameServers)
if n.discv5Discovery.isSome():
b.discv5Conf.withEnabled(n.discv5Discovery.get())

View File

@ -30,12 +30,12 @@ type ClusterConf* = object
# Cluster configuration corresponding to The Waku Network. Note that it
# overrides existing cli configuration
proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
const RelayChainId = 11155111'u256
const RelayChainId = 59141'u256
return ClusterConf(
maxMessageSize: "150KiB",
clusterId: 1,
rlnRelay: true,
rlnRelayEthContractAddress: "0xfe7a9eabcE779a090FD702346Fd0bFAc02ce6Ac8",
rlnRelayEthContractAddress: "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6",
rlnRelayDynamic: true,
rlnRelayChainId: RelayChainId,
rlnEpochSizeSec: 600,

View File

@ -30,23 +30,29 @@ logScope:
# using the when predicate does not work within the contract macro, hence need to dupe
contract(WakuRlnContract):
# this serves as an entrypoint into the rln membership set
proc register(idCommitment: UInt256, userMessageLimit: UInt32)
proc register(
idCommitment: UInt256, userMessageLimit: UInt32, idCommitmentsToErase: seq[UInt256]
)
# Initializes the implementation contract (only used in unit tests)
proc initialize(maxMessageLimit: UInt256)
# this event is raised when a new member is registered
proc MemberRegistered(rateCommitment: UInt256, index: UInt32) {.event.}
# this event is emitted when a new member is registered
proc MembershipRegistered(
idCommitment: UInt256, membershipRateLimit: UInt256, index: UInt32
) {.event.}
# this function denotes existence of a given user
proc memberExists(idCommitment: UInt256): UInt256 {.view.}
proc isInMembershipSet(idCommitment: Uint256): bool {.view.}
# this constant describes the next index of a new member
proc commitmentIndex(): UInt256 {.view.}
proc nextFreeIndex(): UInt256 {.view.}
# this constant describes the block number this contract was deployed on
proc deployedBlockNumber(): UInt256 {.view.}
# this constant describes max message limit of rln contract
proc MAX_MESSAGE_LIMIT(): UInt256 {.view.}
# this function returns the merkleProof for a given index
# proc merkleProofElements(index: UInt40): seq[byte] {.view.}
# this function returns the merkle root
proc root(): UInt256 {.view.}
proc maxMembershipRateLimit(): UInt256 {.view.}
# this function returns the merkleProof for a given index
# proc getMerkleProof(index: EthereumUInt40): seq[array[32, byte]] {.view.}
# this function returns the Merkle root
proc root(): Uint256 {.view.}
type
WakuRlnContractWithSender = Sender[WakuRlnContract]
@ -67,11 +73,7 @@ type
proc setMetadata*(
g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
): GroupManagerResult[void] =
let normalizedBlock =
if lastProcessedBlock.isSome():
lastProcessedBlock.get()
else:
g.latestProcessedBlock
let normalizedBlock = lastProcessedBlock.get(g.latestProcessedBlock)
try:
let metadataSetRes = g.rlnInstance.setMetadata(
RlnMetadata(
@ -87,14 +89,68 @@ proc setMetadata*(
return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
return ok()
proc sendEthCallWithChainId(
ethRpc: Web3,
functionSignature: string,
fromAddress: Address,
toAddress: Address,
chainId: UInt256,
): Future[Result[UInt256, string]] {.async.} =
## Workaround for web3 chainId=null issue on some networks (e.g., linea-sepolia)
## Makes contract calls with explicit chainId for view functions with no parameters
let functionHash =
keccak256.digest(functionSignature.toOpenArrayByte(0, functionSignature.len - 1))
let functionSelector = functionHash.data[0 .. 3]
let dataSignature = "0x" & functionSelector.mapIt(it.toHex(2)).join("")
var tx: TransactionArgs
tx.`from` = Opt.some(fromAddress)
tx.to = Opt.some(toAddress)
tx.value = Opt.some(0.u256)
tx.data = Opt.some(byteutils.hexToSeqByte(dataSignature))
tx.chainId = Opt.some(chainId)
let resultBytes = await ethRpc.provider.eth_call(tx, "latest")
if resultBytes.len == 0:
return err("No result returned for function call: " & functionSignature)
return ok(UInt256.fromBytesBE(resultBytes))
proc sendEthCallWithParams(
ethRpc: Web3,
functionSignature: string,
params: seq[byte],
fromAddress: Address,
toAddress: Address,
chainId: UInt256,
): Future[Result[seq[byte], string]] {.async.} =
## Workaround for web3 chainId=null issue with parameterized contract calls
let functionHash =
keccak256.digest(functionSignature.toOpenArrayByte(0, functionSignature.len - 1))
let functionSelector = functionHash.data[0 .. 3]
let callData = functionSelector & params
var tx: TransactionArgs
tx.`from` = Opt.some(fromAddress)
tx.to = Opt.some(toAddress)
tx.value = Opt.some(0.u256)
tx.data = Opt.some(callData)
tx.chainId = Opt.some(chainId)
let resultBytes = await ethRpc.provider.eth_call(tx, "latest")
return ok(resultBytes)
proc fetchMerkleProofElements*(
g: OnchainGroupManager
): Future[Result[seq[byte], string]] {.async.} =
try:
# let merkleRootInvocation = g.wakuRlnContract.get().root()
# let merkleRoot = await merkleRootInvocation.call()
# The above code is not working with the latest web3 version due to chainId being null (specifically on linea-sepolia)
# TODO: find better solution than this custom sendEthCallWithChainId call
let membershipIndex = g.membershipIndex.get()
let index40 = stuint(membershipIndex, 40)
let methodSig = "merkleProofElements(uint40)"
let methodSig = "getMerkleProof(uint40)"
let methodIdDigest = keccak.keccak256.digest(methodSig)
let methodId = methodIdDigest.data[0 .. 3]
@ -111,6 +167,7 @@ proc fetchMerkleProofElements*(
var tx: TransactionArgs
tx.to = Opt.some(fromHex(Address, g.ethContractAddress))
tx.data = Opt.some(callData)
tx.chainId = Opt.some(g.chainId) # Explicitly set the chain ID
let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest")
@ -123,8 +180,17 @@ proc fetchMerkleRoot*(
g: OnchainGroupManager
): Future[Result[UInt256, string]] {.async.} =
try:
let merkleRootInvocation = g.wakuRlnContract.get().root()
let merkleRoot = await merkleRootInvocation.call()
let merkleRoot = (
await sendEthCallWithChainId(
ethRpc = g.ethRpc.get(),
functionSignature = "root()",
fromAddress = g.ethRpc.get().defaultAccount,
toAddress = fromHex(Address, g.ethContractAddress),
chainId = g.chainId,
)
).valueOr:
error "Failed to fetch Merkle root", error = $error
return err("Failed to fetch merkle root: " & $error)
return ok(merkleRoot)
except CatchableError:
error "Failed to fetch Merkle root", error = getCurrentExceptionMsg()
@ -151,6 +217,7 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
return false
let merkleRoot = UInt256ToField(rootRes.get())
if g.validRoots.len == 0:
g.validRoots.addLast(merkleRoot)
return true
@ -183,8 +250,26 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError
error "Failed to fetch Merkle proof", error = proofResult.error
g.merkleProofCache = proofResult.get()
# also need update registerd membership
let memberCount = cast[int64](await wakuRlnContract.commitmentIndex().call())
# also need to update registered membership
# g.rlnRelayMaxMessageLimit =
# cast[uint64](await wakuRlnContract.nextFreeIndex().call())
# The above code is not working with the latest web3 version due to chainId being null (specifically on linea-sepolia)
# TODO: find better solution than this custom sendEthCallWithChainId call
let nextFreeIndex = await sendEthCallWithChainId(
ethRpc = ethRpc,
functionSignature = "nextFreeIndex()",
fromAddress = ethRpc.defaultAccount,
toAddress = fromHex(Address, g.ethContractAddress),
chainId = g.chainId,
)
if nextFreeIndex.isErr():
error "Failed to fetch next free index", error = nextFreeIndex.error
raise newException(
CatchableError, "Failed to fetch next free index: " & nextFreeIndex.error
)
let memberCount = cast[int64](nextFreeIndex.get())
waku_rln_number_registered_memberships.set(float64(memberCount))
await sleepAsync(rpcDelay)
@ -219,15 +304,19 @@ method register*(
var gasPrice: int
g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitmentHex = identityCredential.idCommitment.inHex()
debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex
let idCommitment = identityCredential.idCommitment.toUInt256()
let idCommitmentsToErase: seq[UInt256] = @[]
debug "registering the member",
idCommitment = idCommitment, userMessageLimit = userMessageLimit
idCommitment = idCommitment,
userMessageLimit = userMessageLimit,
idCommitmentsToErase = idCommitmentsToErase
var txHash: TxHash
g.retryWrapper(txHash, "Failed to register the member"):
await wakuRlnContract.register(idCommitment, userMessageLimit.stuint(32)).send(
gasPrice = gasPrice
)
await wakuRlnContract
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
.send(gasPrice = gasPrice)
# wait for the transaction to be mined
var tsReceipt: ReceiptObject
@ -240,27 +329,29 @@ method register*(
debug "ts receipt", receipt = tsReceipt[]
if tsReceipt.status.isNone():
raise newException(ValueError, "register: transaction failed status is None")
raise newException(ValueError, "Transaction failed: status is None")
if tsReceipt.status.get() != 1.Quantity:
raise newException(
ValueError, "register: transaction failed status is: " & $tsReceipt.status.get()
ValueError, "Transaction failed with status: " & $tsReceipt.status.get()
)
let firstTopic = tsReceipt.logs[0].topics[0]
# the hash of the signature of MemberRegistered(uint256,uint32) event is equal to the following hex value
if firstTopic !=
cast[FixedBytes[32]](keccak.keccak256.digest("MemberRegistered(uint256,uint32)").data):
## Extract MembershipRegistered event from transaction logs (third event)
let thirdTopic = tsReceipt.logs[2].topics[0]
debug "third topic", thirdTopic = thirdTopic
if thirdTopic !=
cast[FixedBytes[32]](keccak.keccak256.digest(
"MembershipRegistered(uint256,uint256,uint32)"
).data):
raise newException(ValueError, "register: unexpected event signature")
# the arguments of the raised event i.e., MemberRegistered are encoded inside the data field
# data = rateCommitment encoded as 256 bits || index encoded as 32 bits
let arguments = tsReceipt.logs[0].data
## Parse MembershipRegistered event data: rateCommitment(256) || membershipRateLimit(256) || index(32)
let arguments = tsReceipt.logs[2].data
debug "tx log data", arguments = arguments
let
# In TX log data, uints are encoded in big endian
membershipIndex = UInt256.fromBytesBE(arguments[32 ..^ 1])
## Extract membership index from transaction log data (big endian)
membershipIndex = UInt256.fromBytesBE(arguments[64 .. 95])
debug "parsed membershipIndex", membershipIndex
trace "parsed membershipIndex", membershipIndex
g.userMessageLimit = some(userMessageLimit)
g.membershipIndex = some(membershipIndex.toMembershipIndex())
g.idCredentials = some(identityCredential)
@ -376,7 +467,7 @@ method generateProof*(
var proofValue = cast[ptr array[320, byte]](output_witness_buffer.`ptr`)
let proofBytes: array[320, byte] = proofValue[]
## parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ]
## Parse the proof as [ proof<128> | root<32> | external_nullifier<32> | share_x<32> | share_y<32> | nullifier<32> ]
let
proofOffset = 128
rootOffset = proofOffset + 32
@ -418,9 +509,7 @@ method generateProof*(
return ok(output)
method verifyProof*(
g: OnchainGroupManager, # verifier context
input: seq[byte], # raw message data (signal)
proof: RateLimitProof, # proof received from the peer
g: OnchainGroupManager, input: seq[byte], proof: RateLimitProof
): GroupManagerResult[bool] {.gcsafe, raises: [].} =
## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots --
@ -543,11 +632,31 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
g.membershipIndex = some(keystoreCred.treeIndex)
g.userMessageLimit = some(keystoreCred.userMessageLimit)
# now we check on the contract if the commitment actually has a membership
let idCommitmentBytes = keystoreCred.identityCredential.idCommitment
let idCommitmentUInt256 = keystoreCred.identityCredential.idCommitment.toUInt256()
let idCommitmentHex = idCommitmentBytes.inHex()
debug "Keystore idCommitment in bytes", idCommitmentBytes = idCommitmentBytes
debug "Keystore idCommitment in UInt256 ", idCommitmentUInt256 = idCommitmentUInt256
debug "Keystore idCommitment in hex ", idCommitmentHex = idCommitmentHex
let idCommitment = idCommitmentUInt256
try:
let membershipExists = await wakuRlnContract
.memberExists(keystoreCred.identityCredential.idCommitment.toUInt256())
.call()
if membershipExists == 0:
let commitmentBytes = keystoreCred.identityCredential.idCommitment
let params = commitmentBytes.reversed()
let resultBytes = await sendEthCallWithParams(
ethRpc = g.ethRpc.get(),
functionSignature = "isInMembershipSet(uint256)",
params = params,
fromAddress = ethRpc.defaultAccount,
toAddress = contractAddress,
chainId = g.chainId,
)
if resultBytes.isErr():
return err("Failed to check membership: " & resultBytes.error)
let responseBytes = resultBytes.get()
let membershipExists = responseBytes.len == 32 and responseBytes[^1] == 1'u8
debug "membershipExists", membershipExists = membershipExists
if membershipExists == false:
return err("the commitment does not have a membership")
except CatchableError:
return err("failed to check if the commitment has a membership")
@ -564,8 +673,18 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
if metadata.contractAddress != g.ethContractAddress.toLower():
return err("persisted data: contract address mismatch")
g.rlnRelayMaxMessageLimit =
cast[uint64](await wakuRlnContract.MAX_MESSAGE_LIMIT().call())
let maxMembershipRateLimit = (
await sendEthCallWithChainId(
ethRpc = ethRpc,
functionSignature = "maxMembershipRateLimit()",
fromAddress = ethRpc.defaultAccount,
toAddress = contractAddress,
chainId = g.chainId,
)
).valueOr:
return err("Failed to fetch max membership rate limit: " & $error)
g.rlnRelayMaxMessageLimit = cast[uint64](maxMembershipRateLimit)
proc onDisconnect() {.async.} =
error "Ethereum client disconnected"

View File

@ -122,7 +122,7 @@ proc processRequest(
roundTrips.inc()
debug "sync payload received",
trace "sync payload received",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = recvPayload
@ -141,7 +141,7 @@ proc processRequest(
recvPayload.shards.toPackedSet() == self.shards:
sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
debug "sync payload processed",
trace "sync payload processed",
hash_to_send = hashToSend, hash_to_recv = hashToRecv
sendPayload.cluster = self.cluster
@ -166,7 +166,7 @@ proc processRequest(
return
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)
debug "sync payload sent",
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = sendPayload
@ -217,7 +217,7 @@ proc initiate(
"remote " & $connection.peerId & " connection write error: " & writeRes.error.msg
)
debug "sync payload sent",
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = connection.peerId,
payload = initPayload