Merge branch 'master' into feat/mix-poc

This commit is contained in:
Prem Chaitanya Prathi 2025-03-25 17:21:03 +05:30 committed by GitHub
commit 537272d8c2
23 changed files with 8155 additions and 7205 deletions

View File

@ -78,6 +78,11 @@ jobs:
- name: Build binaries
run: make V=1 QUICK_AND_DIRTY_COMPILER=1 all tools
trigger-windows-build:
uses: ./.github/workflows/windows-build.yml
with:
branch: ${{ github.ref }}
test:
needs: changes

120
.github/workflows/windows-build.yml vendored Normal file
View File

@ -0,0 +1,120 @@
name: ci / build-windows
on:
workflow_call:
inputs:
branch:
required: true
type: string
jobs:
build:
runs-on: windows-latest
defaults:
run:
shell: msys2 {0}
env:
MSYSTEM: MINGW64
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Setup MSYS2
uses: msys2/setup-msys2@v2
with:
update: true
install: >-
git
base-devel
mingw-w64-x86_64-toolchain
make
cmake
upx
mingw-w64-x86_64-rust
mingw-w64-x86_64-postgresql
mingw-w64-x86_64-gcc
mingw-w64-x86_64-gcc-libs
mingw-w64-x86_64-libwinpthread-git
mingw-w64-x86_64-zlib
mingw-w64-x86_64-openssl
mingw-w64-x86_64-python
mingw-w64-x86_64-cmake
mingw-w64-x86_64-llvm
mingw-w64-x86_64-clang
- name: Add UPX to PATH
run: |
echo "/usr/bin:$PATH" >> $GITHUB_PATH
echo "/mingw64/bin:$PATH" >> $GITHUB_PATH
echo "/usr/lib:$PATH" >> $GITHUB_PATH
echo "/mingw64/lib:$PATH" >> $GITHUB_PATH
- name: Verify dependencies
run: |
which upx gcc g++ make cmake cargo rustc python
- name: Updating submodules
run: git submodule update --init --recursive
- name: Creating tmp directory
run: mkdir -p tmp
- name: Building Nim
run: |
cd vendor/nimbus-build-system/vendor/Nim
./build_all.bat
cd ../../../..
- name: Building libunwind
run: |
cd vendor/nim-libbacktrace
mkdir -p vendor/libunwind/build
pushd vendor/libunwind
cmake -S runtimes \
-DLLVM_ENABLE_RUNTIMES="libunwind" \
-DLIBUNWIND_ENABLE_SHARED=OFF -DLIBUNWIND_ENABLE_STATIC=ON \
-DLIBUNWIND_INCLUDE_DOCS=OFF -DLIBUNWIND_INSTALL_HEADERS=ON \
-DCMAKE_INSTALL_PREFIX="$(pwd)/../install/usr" \
-G "MinGW Makefiles" -B build
cd build
mingw32-make VERBOSE=1 clean
mingw32-make VERBOSE=1 unwind_static
mingw32-make VERBOSE=1 install-unwind
popd
mkdir -p install/usr/lib
cp -r vendor/libunwind/build/lib/libunwind.a install/usr/lib/
- name: Building miniupnpc
run: |
cd vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc
git checkout little_chore_windows_support
make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1
cd ../../../../..
- name: Building libnatpmp
run: |
cd ./vendor/nim-nat-traversal/vendor/libnatpmp-upstream
make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1
cd ../../../../
- name: Building wakunode2
run: |
cd vendor/nim-libbacktrace
cp ./vendor/libunwind/build/lib/libunwind.a install/usr/lib
cd ../..
make wakunode2 LOG_LEVEL=DEBUG V=3 -j8
- name: Check Executable
run: |
if [ -f "./build/wakunode2.exe" ]; then
echo "wakunode2.exe build successful"
else
echo "Build failed: wakunode2.exe not found"
exit 1
fi

View File

@ -0,0 +1,90 @@
---
title: Performance Benchmarks and Test Reports
---
## Introduction
This page summarises key performance metrics for nwaku and provides links to detailed test reports.
> ## TL;DR
>
> - Average Waku bandwidth usage: ~**10 KB/s** (minus discv5 Discovery) for 1KB message size and message injection rate of 1msg/s.
Confirmed for topologies of up to 2000 Relay nodes.
> - Average time for a message to propagate to 100% of nodes: **0.4s** for topologies of up to 2000 Relay nodes.
> - Average per-node bandwidth usage of the discv5 protocol: **8 KB/s** for incoming traffic and **7.4 KB/s** for outgoing traffic,
in a network with 100 continuously online nodes.
> - Future improvements: A messaging API is currently in development to streamline interactions with the Waku protocol suite.
Once completed, it will enable benchmarking at the messaging API level, allowing applications to more easily compare their
own performance results.
## Insights
### Relay Bandwidth Usage: nwaku v0.34.0
The average per-node `libp2p` bandwidth usage in a 1000-node Relay network with 1KB messages at varying injection rates.
| Message Injection Rate | Average libp2p incoming bandwidth (KB/s) | Average libp2p outgoing bandwidth (KB/s) |
|------------------------|------------------------------------------|------------------------------------------|
| 1 msg/s | ~10.1 | ~10.3 |
| 1 msg/10s | ~1.8 | ~1.9 |
### Message Propagation Latency: nwaku v0.34.0-rc1
The message propagation latency is measured as the total time for a message to reach all nodes.
We compare the latency in different network configurations for the following simulation parameters:
- Total messages published: 600
- Message size: 1KB
- Message injection rate: 1msg/s
The different network configurations tested are:
- Relay Config: 1000 nodes with relay enabled
- Mixed Config: 210 nodes, consisting of bootstrap nodes, filter clients and servers, lightpush clients and servers, store nodes
- Non-persistent Relay Config: 500 persistent relay nodes, 10 store nodes and 100 non-persistent relay nodes
Click on a specific config to see the detailed test report.
| Config | Average Message Propagation Latency (s) | Max Message Propagation Latency (s)|
|------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------|------------------------------------|
| [Relay](https://www.notion.so/Waku-regression-testing-v0-34-1618f96fb65c803bb7bad6ecd6bafff9) (1000 nodes) | 0.05 | 1.6 |
| [Mixed](https://www.notion.so/Mixed-environment-analysis-1688f96fb65c809eb235c59b97d6e15b) (210 nodes) | 0.0125 | 0.007 |
| [Non-persistent Relay](https://www.notion.so/High-Churn-Relay-Store-Reliability-16c8f96fb65c8008bacaf5e86881160c) (510 nodes)| 0.0125 | 0.25 |
### Discv5 Bandwidth Usage: nwaku v0.34.0
The average bandwidth usage of discv5 for a network of 100 nodes and message injection rate of 0 or 1msg/s.
The measurements are based on a stable network where all nodes have already connected to peers to form a healthy mesh.
|Message size |Average discv5 incoming bandwidth (KB/s)|Average discv5 outgoing bandwidth (KB/s)|
|-------------------- |----------------------------------------|----------------------------------------|
| no message injection| 7.88 | 6.70 |
| 1KB | 8.04 | 7.40 |
| 10KB | 8.03 | 7.45 |
## Testing
### DST
The VAC DST team performs regression testing on all new **nwaku** releases, comparing performance with previous versions.
They simulate large Waku networks with a variety of network and protocol configurations that are representative of real-world usage.
**Test Reports**: [DST Reports](https://www.notion.so/DST-Reports-1228f96fb65c80729cd1d98a7496fe6f)
### QA
The VAC QA team performs interoperability tests for **nwaku** and **go-waku** using the latest main branch builds.
These tests run daily and verify protocol functionality by targeting specific features of each protocol.
**Test Reports**: [QA Reports](https://discord.com/channels/1110799176264056863/1196933819614363678)
### nwaku
The **nwaku** team follows a structured release procedure for all release candidates.
This involves deploying RCs to `status.staging` fleet for validation and performing sanity checks.
**Release Process**: [nwaku Release Procedure](https://github.com/waku-org/nwaku/blob/master/.github/ISSUE_TEMPLATE/prepare_release.md)
### Research
The Waku Research team conducts a variety of benchmarking, performance testing, proof-of-concept validations and debugging efforts.
They also maintain a Waku simulator designed for small-scale, single-purpose, on-demand testing.
**Test Reports**: [Waku Research Reports](https://www.notion.so/Miscellaneous-2c02516248db4a28ba8cb2797a40d1bb)
**Waku Simulator**: [Waku Simulator Book](https://waku-org.github.io/waku-simulator/)

File diff suppressed because it is too large Load Diff

View File

@ -224,6 +224,7 @@ suite "Waku Lightpush message delivery":
## When
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
assert res.get() == 1, "Expected to relay the message to 1 node"
## Then
check await completionFutRelay.withTimeout(5.seconds)

View File

@ -1,188 +0,0 @@
{.used.}
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto
import
../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]
suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let timeOrigin = now()
let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
# messages are retreived when mounting Waku sync
# but based on interval so this is needed for client only
for msg in messages:
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await allFutures(server.start(), client.start())
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)
asyncTeardown:
# prevent premature channel shutdown
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncRange = 1.hours,
syncInterval = 5.minutes,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncRange = 10.milliseconds,
syncInterval = 10.milliseconds,
relayJitter = 0.seconds,
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
await allFutures(server.start(), client.start())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "pruning":
for _ in 0 ..< 4:
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == 10
server.wakuSync.storageSize() == 40

View File

@ -10,7 +10,7 @@ import
waku_core,
waku_core/message/digest,
waku_archive/driver/sqlite_driver,
waku_archive
waku_archive,
],
../waku_archive/archive_utils,
../testlib/wakucore
@ -113,15 +113,18 @@ suite "Waku Archive - message handling":
test "convert query to label":
check:
convertQueryToMetricLabel("SELECT version();") == "select_version"
convertQueryToMetricLabel("SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?") == "msg_hash_no_ctopic"
convertQueryToMetricLabel(""" SELECT child.relname AS partition_name
convertQueryToMetricLabel(
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?"
) == "msg_hash_no_ctopic"
convertQueryToMetricLabel(
""" SELECT child.relname AS partition_name
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='messages""") == "get_partitions_list"
WHERE parent.relname='messages"""
) == "get_partitions_list"
procSuite "Waku Archive - find messages":
## Fixtures

View File

@ -7,25 +7,27 @@ declarePublicCounter query_count,
"number of times a query is being performed", labels = ["query"]
## Maps parts of the possible known queries with a fixed and shorter query label.
const QueriesToMetricMap* = toTable({
"contentTopic IN": "content_topic",
"SELECT version()": "select_version",
"WITH min_timestamp": "messages_lookup",
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_no_ctopic",
"AS partition_name": "get_partitions_list",
"SELECT COUNT(1) FROM messages": "count_msgs",
"SELECT messageHash FROM messages WHERE (timestamp, messageHash) < (?,?) AND pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_with_cursor",
"SELECT pg_database_size(current_database())": "get_database_size",
"DELETE FROM messages_lookup WHERE timestamp": "delete_from_msgs_lookup",
"DROP TABLE messages_": "drop_partition_table",
"ALTER TABLE messages DETACH PARTITION": "detach_partition",
"SELECT pg_size_pretty(pg_total_relation_size(C.oid))": "get_partition_size",
"pg_try_advisory_lock": "try_advisory_lock",
"SELECT messageHash FROM messages ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"get_all_msg_hash",
"SELECT pg_advisory_unlock": "advisory_unlock",
"ANALYZE messages": "analyze_messages",
"SELECT EXISTS": "check_version_table_exists",
})
const QueriesToMetricMap* = toTable(
{
"contentTopic IN": "content_topic",
"SELECT version()": "select_version",
"WITH min_timestamp": "messages_lookup",
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_no_ctopic",
"AS partition_name": "get_partitions_list",
"SELECT COUNT(1) FROM messages": "count_msgs",
"SELECT messageHash FROM messages WHERE (timestamp, messageHash) < (?,?) AND pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_with_cursor",
"SELECT pg_database_size(current_database())": "get_database_size",
"DELETE FROM messages_lookup WHERE timestamp": "delete_from_msgs_lookup",
"DROP TABLE messages_": "drop_partition_table",
"ALTER TABLE messages DETACH PARTITION": "detach_partition",
"SELECT pg_size_pretty(pg_total_relation_size(C.oid))": "get_partition_size",
"pg_try_advisory_lock": "try_advisory_lock",
"SELECT messageHash FROM messages ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"get_all_msg_hash",
"SELECT pg_advisory_unlock": "advisory_unlock",
"ANALYZE messages": "analyze_messages",
"SELECT EXISTS": "check_version_table_exists",
}
)

View File

@ -74,6 +74,7 @@ proc getPeer*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo =
)
proc addPeer*(wps: WakuPeerStore, peer: RemotePeerInfo) =
## Only used in tests
wps[AddressBook][peer.peerId] = peer.addrs
wps[ProtoBook][peer.peerId] = peer.protocols
wps[AgentBook][peer.peerId] = peer.agent

View File

@ -374,9 +374,9 @@ proc mountStoreSync*(
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](100)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let idsChannel = newAsyncQueue[SyncID](0)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
var cluster: uint16
var shards: seq[uint16]
@ -397,12 +397,24 @@ proc mountStoreSync*(
node.wakuStoreReconciliation = recon
let reconMountRes = catch:
node.switch.mount(
node.wakuStoreReconciliation, protocolMatcher(WakuReconciliationCodec)
)
if reconMountRes.isErr():
return err(reconMountRes.error.msg)
let transfer = SyncTransfer.new(
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
)
node.wakuStoreTransfer = transfer
let transMountRes = catch:
node.switch.mount(node.wakuStoreTransfer, protocolMatcher(WakuTransferCodec))
if transMountRes.isErr():
return err(transMountRes.error.msg)
return ok()
## Waku relay
@ -1294,7 +1306,7 @@ proc lightpushPublishHandler(
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with legacy lightpush",
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
@ -1302,7 +1314,7 @@ proc lightpushPublishHandler(
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
if not node.wakuLightPush.isNil():
notice "publishing message with self hosted legacy lightpush",
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,

View File

@ -33,7 +33,7 @@ const NoPeerNoneFoundError =
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
proc useSelfHostedLightPush(node: WakuNode): bool =
return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode =
## Lightpush status codes are matching HTTP status codes by design

View File

@ -132,6 +132,9 @@ proc syncMessageIngress*(
pubsubTopic: PubsubTopic,
msg: WakuMessage,
): Future[Result[void, string]] {.async.} =
if msg.ephemeral:
return err("ephemeral message, will not store")
let msgHashHex = msgHash.to0xHex()
trace "handling message in syncMessageIngress",

View File

@ -5,7 +5,6 @@ const
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0"
WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"

View File

@ -30,6 +30,7 @@ const capabilityToCodec = {
Capabilities.Lightpush: WakuLightPushCodec,
Capabilities.Sync: WakuSyncCodec,
Capabilities.Mix: MixProtocolID
Capabilities.Sync: WakuReconciliationCodec,
}.toTable
func init*(

View File

@ -98,6 +98,11 @@ proc handleRequest*(
none[string]()
else:
handleRes.error.desc,
relayPeerCount:
if isSuccess:
some(handleRes.get())
else:
none[uint32](),
)
if not isSuccess:

View File

@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding
const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds
const MaxRegistrationInterval* = 5.minutes
const PeersRequestedCount* = 12
proc computeNamespace*(clusterId: uint16, shard: uint16): string =

View File

@ -32,6 +32,7 @@ type WakuRendezVous* = ref object
relayShard: RelayShards
capabilities: seq[Capabilities]
registrationInterval: timer.Duration
periodicRegistrationFut: Future[void]
proc batchAdvertise*(
@ -42,7 +43,7 @@ proc batchAdvertise*(
): Future[Result[void, string]] {.async: (raises: []).} =
## Register with all rendezvous peers under a namespace
# rendezvous.advertise except already opened connections
# rendezvous.advertise expects already opened connections
# must dial first
var futs = collect(newSeq):
for peerId in peers:
@ -62,7 +63,7 @@ proc batchAdvertise*(
fut.read()
if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg
warn "a rendezvous dial failed", cause = catchable.error.msg
continue
let connOpt = catchable.get()
@ -91,7 +92,7 @@ proc batchRequest*(
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
## Request all records from all rendezvous peers matching a namespace
# rendezvous.request except already opened connections
# rendezvous.request expects already opened connections
# must dial first
var futs = collect(newSeq):
for peerId in peers:
@ -111,7 +112,7 @@ proc batchRequest*(
fut.read()
if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg
warn "a rendezvous dial failed", cause = catchable.error.msg
continue
let connOpt = catchable.get()
@ -143,7 +144,6 @@ proc advertiseAll(
for pubsubTopic in pubsubTopics:
# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)
@ -151,6 +151,9 @@ proc advertiseAll(
# Advertise yourself on that peer
self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch:
await allFinished(futs)
@ -159,7 +162,7 @@ proc advertiseAll(
for fut in catchable.get():
if fut.failed():
error "rendezvous advertisement failed", error = fut.error.msg
warn "a rendezvous advertisement failed", cause = fut.error.msg
debug "waku rendezvous advertisements finished"
@ -178,12 +181,14 @@ proc initialRequestAll*(
# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue
# Ask for peer records for that shard
self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch:
await allFinished(futs)
@ -192,12 +197,13 @@ proc initialRequestAll*(
for fut in catchable.get():
if fut.failed():
error "rendezvous request failed", error = fut.error.msg
warn "a rendezvous request failed", cause = fut.error.msg
elif fut.finished():
let res = fut.value()
let records = res.valueOr:
return err($res.error)
warn "a rendezvous request failed", cause = $res.error
continue
for record in records:
rendezvousPeerFoundTotal.inc()
@ -209,15 +215,23 @@ proc initialRequestAll*(
proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval
interval = self.registrationInterval
# infinite loop
while true:
await sleepAsync(DefaultRegistrationInterval)
await sleepAsync(self.registrationInterval)
(await self.advertiseAll()).isOkOr:
debug "waku rendezvous advertisements failed", error = error
if self.registrationInterval > MaxRegistrationInterval:
self.registrationInterval = MaxRegistrationInterval
else:
self.registrationInterval += self.registrationInterval
# Back to normal interval if no errors
self.registrationInterval = DefaultRegistrationInterval
proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): Result[T, string] {.raises: [].} =
@ -246,6 +260,7 @@ proc new*(
wrv.peerManager = peerManager
wrv.relayshard = relayshard
wrv.capabilities = capabilities
wrv.registrationInterval = DefaultRegistrationInterval
debug "waku rendezvous initialized",
cluster = relayshard.clusterId,

View File

@ -200,14 +200,10 @@ method generateProof*(
messageId = messageId,
).valueOr:
return err("proof generation failed: " & $error)
return ok(proof)
if lastProcessedEpoch != epoch:
lastProcessedEpoch = epoch
waku_rln_proof_remining.set(g.userMessageLimit.get().float64 - 1)
else:
waku_rln_proof_remining.dec()
waku_rln_proofs_generated_total.inc()
waku_rln_remaining_proofs_per_epoch.dec()
waku_rln_total_generated_proofs.inc()
return ok(proof)
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
raise newException(

View File

@ -63,12 +63,12 @@ declarePublicGauge(
)
declarePublicGauge(
waku_rln_proof_remining,
waku_rln_remaining_proofs_per_epoch,
"number of proofs remaining to be generated for the current epoch",
)
declarePublicGauge(
waku_rln_proofs_generated_total,
waku_rln_total_generated_proofs,
"total number of proofs generated since the node started",
)
@ -84,6 +84,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
var cumulativeValidMessages = 0.float64
var cumulativeProofsVerified = 0.float64
var cumulativeProofsGenerated = 0.float64
var cumulativeProofsRemaining = 100.float64
when defined(metrics):
logMetrics = proc() =
@ -102,7 +103,10 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
waku_rln_proof_verification_total, cumulativeProofsVerified
)
let freshProofsGeneratedCount =
parseAndAccumulate(waku_rln_proofs_generated_total, cumulativeProofsGenerated)
parseAndAccumulate(waku_rln_total_generated_proofs, cumulativeProofsGenerated)
let freshProofsRemainingCount = parseAndAccumulate(
waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining
)
info "Total messages", count = freshMsgCount
info "Total spam messages", count = freshSpamCount
@ -111,4 +115,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
info "Total errors", count = freshErrorCount
info "Total proofs verified", count = freshProofsVerifiedCount
info "Total proofs generated", count = freshProofsGeneratedCount
info "Total proofs remaining", count = freshProofsRemainingCount
return logMetrics

View File

@ -89,6 +89,7 @@ type WakuRLNRelay* = ref object of RootObj
groupManager*: GroupManager
onFatalErrorAction*: OnFatalErrorHandler
nonceManager*: NonceManager
epochMonitorFuture*: Future[void]
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
@ -96,6 +97,18 @@ proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
return toEpoch(e)
proc nextEpoch*(rlnPeer: WakuRLNRelay, time: float64): float64 =
let
currentEpoch = uint64(time / rlnPeer.rlnEpochSizeSec.float64)
nextEpochTime = float64(currentEpoch + 1) * rlnPeer.rlnEpochSizeSec.float64
currentTime = epochTime()
# Ensure we always return a future time
if nextEpochTime > currentTime:
return nextEpochTime
else:
return epochTime()
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol
@ -392,6 +405,19 @@ proc generateRlnValidator*(
return validator
proc monitorEpochs(wakuRlnRelay: WakuRLNRelay) {.async.} =
while true:
try:
waku_rln_remaining_proofs_per_epoch.set(
wakuRlnRelay.groupManager.userMessageLimit.get().float64
)
except CatchableError:
error "Error in epoch monitoring", error = getCurrentExceptionMsg()
let nextEpochTime = wakuRlnRelay.nextEpoch(epochTime())
let sleepDuration = int((nextEpochTime - epochTime()) * 1000)
await sleepAsync(sleepDuration)
proc mount(
conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler)
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
@ -445,17 +471,19 @@ proc mount(
(await groupManager.startGroupSync()).isOkOr:
return err("could not start the group sync: " & $error)
return ok(
WakuRLNRelay(
groupManager: groupManager,
nonceManager:
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
rlnEpochSizeSec: conf.rlnEpochSizeSec,
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
onFatalErrorAction: conf.onFatalErrorAction,
)
wakuRlnRelay = WakuRLNRelay(
groupManager: groupManager,
nonceManager:
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
rlnEpochSizeSec: conf.rlnEpochSizeSec,
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
onFatalErrorAction: conf.onFatalErrorAction,
)
# Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
return ok(wakuRlnRelay)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
## returns true if the rln-relay protocol is ready to relay messages
## returns false otherwise

View File

@ -1,8 +1,6 @@
{.push raises: [].}
import
./waku_store_sync/reconciliation,
./waku_store_sync/transfer,
./waku_store_sync/common
./waku_store_sync/reconciliation, ./waku_store_sync/transfer, ./waku_store_sync/common
export reconciliation, transfer, common

View File

@ -68,6 +68,9 @@ type SyncReconciliation* = ref object of LPProtocol
proc messageIngress*(
self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage
) =
if msg.ephemeral:
return
let msgHash = computeMessageHash(pubsubTopic, msg)
let id = SyncID(time: msg.timestamp, hash: msgHash)
@ -78,6 +81,9 @@ proc messageIngress*(
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
) =
if msg.ephemeral:
return
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
@ -129,10 +135,10 @@ proc processRequest(
sendPayload.shards = self.shards.toSeq()
for hash in hashToSend:
await self.remoteNeedsTx.addLast((conn.peerId, hash))
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
for hash in hashToRecv:
await self.localWantstx.addLast((conn.peerId, hash))
self.localWantsTx.addLastNoWait((conn.peerId, hash))
rawPayload = sendPayload.deltaEncode()

View File

@ -118,6 +118,10 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
error "failed to query archive", error = error
continue
if response.messages.len < 1:
error "failed to fetch message from db"
continue
let msg =
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])