Merge remote-tracking branch 'origin/master' into feat-lmapi-lib

This commit is contained in:
NagyZoltanPeter 2026-02-16 13:53:33 +01:00
commit d0d07c7cc4
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
74 changed files with 2375 additions and 568 deletions

View File

@ -33,6 +33,7 @@ jobs:
make
cmake
upx
unzip
mingw-w64-x86_64-rust
mingw-w64-x86_64-postgresql
mingw-w64-x86_64-gcc
@ -44,6 +45,12 @@ jobs:
mingw-w64-x86_64-cmake
mingw-w64-x86_64-llvm
mingw-w64-x86_64-clang
mingw-w64-x86_64-nasm
- name: Manually install nasm
run: |
bash scripts/install_nasm_in_windows.sh
source $HOME/.bashrc
- name: Add UPX to PATH
run: |
@ -54,7 +61,7 @@ jobs:
- name: Verify dependencies
run: |
which upx gcc g++ make cmake cargo rustc python
which upx gcc g++ make cmake cargo rustc python nasm
- name: Updating submodules
run: git submodule update --init --recursive

View File

@ -8,7 +8,7 @@ ARG LOG_LEVEL=TRACE
ARG HEAPTRACK_BUILD=0
# Get build tools and required header files
RUN apk add --no-cache bash git build-base openssl-dev linux-headers curl jq
RUN apk add --no-cache bash git build-base openssl-dev linux-headers curl jq libbsd-dev
WORKDIR /app
COPY . .
@ -46,7 +46,7 @@ LABEL version="unknown"
EXPOSE 30303 60000 8545
# Referenced in the binary
RUN apk add --no-cache libgcc libpq-dev bind-tools
RUN apk add --no-cache libgcc libpq-dev bind-tools libstdc++
# Copy to separate location to accomodate different MAKE_TARGET values
COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/

View File

@ -43,6 +43,7 @@ ifeq ($(detected_OS),Windows)
LIBS = -lws2_32 -lbcrypt -liphlpapi -luserenv -lntdll -lminiupnpc -lnatpmp -lpq
NIM_PARAMS += $(foreach lib,$(LIBS),--passL:"$(lib)")
NIM_PARAMS += --passL:"-Wl,--allow-multiple-definition"
export PATH := /c/msys64/usr/bin:/c/msys64/mingw64/bin:/c/msys64/usr/lib:/c/msys64/mingw64/lib:$(PATH)

View File

@ -278,6 +278,10 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
pingSuccess = false
error "Ping operation failed or timed out", error = exc.msg
if not pingSuccess:
error "Ping to the node failed", peerId = peer.peerId, conStatus = $conStatus
quit(QuitFailure)
if conStatus in [Connected, CanConnect]:
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]
@ -285,11 +289,6 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
error "Not all protocols are supported",
expected = conf.protocols, supported = nodeProtocols
quit(QuitFailure)
# Check ping result if ping was enabled
if conf.ping and not pingSuccess:
error "Node is reachable and supports protocols but ping failed - connection may be unstable"
quit(QuitFailure)
elif conStatus == CannotConnect:
error "Could not connect", peerId = peer.peerId
quit(QuitFailure)

View File

@ -0,0 +1,19 @@
{.push raises: [].}
import system, std/json
import ./json_base_event
import ../../waku/api/types
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
status*: ConnectionStatus
proc new*(
T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus
): T =
return JsonConnectionStatusChangeEvent(
eventType: "node_health_change",
status: status
)
method `$`*(event: JsonConnectionStatusChangeEvent): string =
$(%*event)

View File

@ -7,9 +7,11 @@ import
./events/json_message_event,
./events/json_topic_health_change_event,
./events/json_connection_change_event,
./events/json_connection_status_change_event,
../waku/factory/app_callbacks,
waku/factory/waku,
waku/node/waku_node,
waku/node/health_monitor/health_status,
./declare_lib
################################################################################
@ -61,10 +63,16 @@ proc waku_new(
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler =
return proc(status: ConnectionStatus) {.async.} =
callEventCallback(ctx, "onConnectionStatusChange"):
$JsonConnectionStatusChangeEvent.new(status)
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx),
connectionStatusChangeHandler: onConnectionStatusChange(ctx)
)
ffi.sendRequestToFFIThread(

View File

@ -1,7 +1,8 @@
#!/usr/bin/env bash
# This script is used to build the rln library for the current platform, or download it from the
# release page if it is available.
# This script is used to build the rln library for the current platform.
# Previously downloaded prebuilt binaries, but due to compatibility issues
# we now always build from source.
set -e
@ -14,41 +15,26 @@ output_filename=$3
[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; }
[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; }
# Get the host triplet
host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}')
echo "Building RLN library from source (version ${rln_version})..."
tarball="${host_triplet}"
tarball+="-stateless"
tarball+="-rln.tar.gz"
# Check if submodule version = version in Makefile
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
# Download the prebuilt rln library if it is available
if curl --silent --fail-with-body -L \
"https://github.com/vacp2p/zerokit/releases/download/$rln_version/$tarball" \
-o "${tarball}";
then
echo "Downloaded ${tarball}"
tar -xzf "${tarball}"
mv "release/librln.a" "${output_filename}"
rm -rf "${tarball}" release
detected_OS=$(uname -s)
if [[ "$detected_OS" == MINGW* || "$detected_OS" == MSYS* ]]; then
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | sed -n 's/.*"name":"rln","version":"\([^"]*\)".*/\1/p')
else
echo "Failed to download ${tarball}"
# Build rln instead
# first, check if submodule version = version in Makefile
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
detected_OS=$(uname -s)
if [[ "$detected_OS" == MINGW* || "$detected_OS" == MSYS* ]]; then
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | sed -n 's/.*"name":"rln","version":"\([^"]*\)".*/\1/p')
else
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | jq -r '.packages[] | select(.name == "rln") | .version')
fi
if [[ "v${submodule_version}" != "${rln_version}" ]]; then
echo "Submodule version (v${submodule_version}) does not match version in Makefile (${rln_version})"
echo "Please update the submodule to ${rln_version}"
exit 1
fi
# if submodule version = version in Makefile, build rln
cargo build --release -p rln --manifest-path "${build_dir}/rln/Cargo.toml"
cp "${build_dir}/target/release/librln.a" "${output_filename}"
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | jq -r '.packages[] | select(.name == "rln") | .version')
fi
if [[ "v${submodule_version}" != "${rln_version}" ]]; then
echo "Submodule version (v${submodule_version}) does not match version in Makefile (${rln_version})"
echo "Please update the submodule to ${rln_version}"
exit 1
fi
# Build rln from source
cargo build --release -p rln --manifest-path "${build_dir}/rln/Cargo.toml"
cp "${build_dir}/target/release/librln.a" "${output_filename}"
echo "Successfully built ${output_filename}"

View File

@ -0,0 +1,37 @@
#!/usr/bin/env sh
set -e
NASM_VERSION="2.16.01"
NASM_ZIP="nasm-${NASM_VERSION}-win64.zip"
NASM_URL="https://www.nasm.us/pub/nasm/releasebuilds/${NASM_VERSION}/win64/${NASM_ZIP}"
INSTALL_DIR="$HOME/.local/nasm"
BIN_DIR="$INSTALL_DIR/bin"
echo "Installing NASM ${NASM_VERSION}..."
# Create directories
mkdir -p "$BIN_DIR"
cd "$INSTALL_DIR"
# Download
if [ ! -f "$NASM_ZIP" ]; then
echo "Downloading NASM..."
curl -LO "$NASM_URL"
fi
# Extract
echo "Extracting..."
unzip -o "$NASM_ZIP"
# Move binaries
cp nasm-*/nasm.exe "$BIN_DIR/"
cp nasm-*/ndisasm.exe "$BIN_DIR/"
# Add to PATH in bashrc (idempotent)
if ! grep -q 'nasm/bin' "$HOME/.bashrc"; then
echo '' >> "$HOME/.bashrc"
echo '# NASM' >> "$HOME/.bashrc"
echo 'export PATH="$HOME/.local/nasm/bin:$PATH"' >> "$HOME/.bashrc"
fi

104
scripts/regenerate_anvil_state.sh Executable file
View File

@ -0,0 +1,104 @@
#!/usr/bin/env bash
# Simple script to regenerate the Anvil state file
# This creates a state file compatible with the current Foundry version
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
STATE_DIR="$PROJECT_ROOT/tests/waku_rln_relay/anvil_state"
STATE_FILE="$STATE_DIR/state-deployed-contracts-mint-and-approved.json"
STATE_FILE_GZ="${STATE_FILE}.gz"
echo "==================================="
echo "Anvil State File Regeneration Tool"
echo "==================================="
echo ""
# Check if Foundry is installed
if ! command -v anvil &> /dev/null; then
echo "ERROR: anvil is not installed!"
echo "Please run: make rln-deps"
exit 1
fi
ANVIL_VERSION=$(anvil --version 2>/dev/null | head -n1)
echo "Using Foundry: $ANVIL_VERSION"
echo ""
# Backup existing state file
if [ -f "$STATE_FILE_GZ" ]; then
BACKUP_FILE="${STATE_FILE_GZ}.backup-$(date +%Y%m%d-%H%M%S)"
echo "Backing up existing state file to: $(basename $BACKUP_FILE)"
cp "$STATE_FILE_GZ" "$BACKUP_FILE"
fi
# Remove old state files
rm -f "$STATE_FILE" "$STATE_FILE_GZ"
echo ""
echo "Running test to generate fresh state file..."
echo "This will:"
echo " 1. Build RLN library"
echo " 2. Start Anvil with state dump enabled"
echo " 3. Deploy contracts"
echo " 4. Save state and compress it"
echo ""
cd "$PROJECT_ROOT"
# Run a single test that deploys contracts
# The test framework will handle state dump
make test tests/waku_rln_relay/test_rln_group_manager_onchain.nim "RLN instances" || {
echo ""
echo "Test execution completed (exit status: $?)"
echo "Checking if state file was generated..."
}
# Check if state file was created
if [ -f "$STATE_FILE" ]; then
echo ""
echo "✓ State file generated: $STATE_FILE"
# Compress it
gzip -c "$STATE_FILE" > "$STATE_FILE_GZ"
echo "✓ Compressed: $STATE_FILE_GZ"
# File sizes
STATE_SIZE=$(du -h "$STATE_FILE" | cut -f1)
GZ_SIZE=$(du -h "$STATE_FILE_GZ" | cut -f1)
echo ""
echo "File sizes:"
echo " Uncompressed: $STATE_SIZE"
echo " Compressed: $GZ_SIZE"
# Optionally remove uncompressed
echo ""
read -p "Remove uncompressed state file? [y/N] " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
rm "$STATE_FILE"
echo "✓ Removed uncompressed file"
fi
echo ""
echo "============================================"
echo "✓ SUCCESS! State file regenerated"
echo "============================================"
echo ""
echo "Next steps:"
echo " 1. Test locally: make test tests/node/test_wakunode_lightpush.nim"
echo " 2. If tests pass, commit: git add $STATE_FILE_GZ"
echo " 3. Push and verify CI passes"
echo ""
else
echo ""
echo "============================================"
echo "✗ ERROR: State file was not generated"
echo "============================================"
echo ""
echo "The state file should have been created at: $STATE_FILE"
echo "Please check the test output above for errors."
exit 1
fi

View File

@ -89,6 +89,7 @@ import
./test_waku_netconfig,
./test_waku_switch,
./test_waku_rendezvous,
./test_waku_metadata,
./waku_discv5/test_waku_discv5
# Waku Keystore test suite

View File

@ -1,3 +1,3 @@
{.used.}
import ./test_entry_nodes, ./test_node_conf
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health

View File

@ -0,0 +1,296 @@
{.used.}
import std/[options, sequtils, times]
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
import ../testlib/[common, wakucore, wakunode, testasync]
import
waku,
waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context],
waku/node/health_monitor/[topic_health, health_status, protocol_health, health_report],
waku/requests/health_requests,
waku/requests/node_requests,
waku/events/health_events,
waku/common/waku_protocol,
waku/factory/waku_conf
const TestTimeout = chronos.seconds(10)
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
discard
proc waitForConnectionStatus(
brokerCtx: BrokerContext, expected: ConnectionStatus
) {.async.} =
var future = newFuture[void]("waitForConnectionStatus")
let handler: EventConnectionStatusChangeListenerProc = proc(
e: EventConnectionStatusChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.connectionStatus == expected:
future.complete()
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if not await future.withTimeout(TestTimeout):
raiseAssert "Timeout waiting for status: " & $expected
finally:
EventConnectionStatusChange.dropListener(brokerCtx, handle)
proc waitForShardHealthy(
brokerCtx: BrokerContext
): Future[EventShardTopicHealthChange] {.async.} =
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
let handler: EventShardTopicHealthChangeListenerProc = proc(
e: EventShardTopicHealthChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.health == TopicHealth.MINIMALLY_HEALTHY or
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
future.complete(e)
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if await future.withTimeout(TestTimeout):
return future.read()
else:
raiseAssert "Timeout waiting for shard health event"
finally:
EventShardTopicHealthChange.dropListener(brokerCtx, handle)
suite "LM API health checking":
var
serviceNode {.threadvar.}: WakuNode
client {.threadvar.}: Waku
servicePeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
lockNewGlobalBrokerContext:
serviceNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
(await serviceNode.mountRelay()).isOkOr:
raiseAssert error
serviceNode.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert error
await serviceNode.mountLibp2pPing()
await serviceNode.start()
servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo()
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
lockNewGlobalBrokerContext:
let conf = NodeConfig.init(
mode = WakuMode.Core,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
)
client = (await createNode(conf)).valueOr:
raiseAssert error
(await startWaku(addr client)).isOkOr:
raiseAssert error
asyncTeardown:
discard await client.stop()
await serviceNode.stop()
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr:
raiseAssert "RequestShardTopicsHealth failed"
if req.topicHealth.len > 0:
let h = req.topicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
client.node.wakuRelay.subscribe(GhostShard, dummyHandler)
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr:
raiseAssert "Request failed"
check req.topicHealth.len > 0
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
asyncTest "RequestProtocolHealth, check relay status":
await client.node.connectToNodes(@[servicePeerInfo])
var isReady = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let relayReq = await RequestProtocolHealth.request(
client.brokerCtx, WakuProtocol.RelayProtocol
)
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
isReady = true
break
await sleepAsync(chronos.milliseconds(100))
check isReady == true
let storeReq =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
if storeReq.isOk():
check storeReq.get().healthStatus.health != HealthStatus.READY
asyncTest "RequestProtocolHealth, check unmounted protocol":
let req =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
check req.isOk()
let status = req.get().healthStatus
check status.health == HealthStatus.NOT_MOUNTED
check status.desc.isNone()
asyncTest "RequestConnectionStatus, check connectivity state":
let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
check initialReq.connectionStatus == ConnectionStatus.Disconnected
await client.node.connectToNodes(@[servicePeerInfo])
var isConnected = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestConnectionStatus.request(client.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
req.connectionStatus == ConnectionStatus.Connected:
isConnected = true
break
await sleepAsync(chronos.milliseconds(100))
check isConnected == true
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
let connectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected)
await client.node.connectToNodes(@[servicePeerInfo])
await connectFuture
let disconnectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected)
await client.node.disconnectNode(servicePeerInfo)
await disconnectFuture
asyncTest "EventShardTopicHealthChange, detect health improvement":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
let healthEventFuture = waitForShardHealthy(client.brokerCtx)
await client.node.connectToNodes(@[servicePeerInfo])
let event = await healthEventFuture
check event.topic == DefaultShard
asyncTest "RequestHealthReport, check aggregate report":
let req = await RequestHealthReport.request(client.brokerCtx)
check req.isOk()
let report = req.get().healthReport
check report.nodeHealth == HealthStatus.READY
check report.protocolsHealth.len > 0
check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol)
asyncTest "RequestContentTopicsHealth, smoke test":
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic])
check req.isOk()
let res = req.get()
check res.contentTopicHealth.len == 1
check res.contentTopicHealth[0].topic == fictionalTopic
check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED
asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding":
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
let shardReq =
RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic)
check shardReq.isOk()
let targetShard = $shardReq.get().relayShard
client.node.wakuRelay.subscribe(targetShard, dummyHandler)
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr:
raiseAssert "Request failed"
if req.contentTopicHealth.len > 0:
let h = req.contentTopicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestProtocolHealth, edge mode smoke test":
var edgeWaku: Waku
lockNewGlobalBrokerContext:
let edgeConf = NodeConfig.init(
mode = WakuMode.Edge,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
messageValidation =
MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)),
),
)
edgeWaku = (await createNode(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error
(await startWaku(addr edgeWaku)).isOkOr:
raiseAssert "Failed to start edge waku: " & error
let relayReq = await RequestProtocolHealth.request(
edgeWaku.brokerCtx, WakuProtocol.RelayProtocol
)
check relayReq.isOk()
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
check not edgeWaku.node.wakuFilterClient.isNil()
discard await edgeWaku.stop()

View File

@ -117,6 +117,9 @@ proc validate(
check requestId == expectedRequestId
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
# allocate random ports to avoid port-already-in-use errors
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
result = NodeConfig.init(
mode = mode,
protocolsConfig = ProtocolsConfig.init(
@ -124,6 +127,7 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
networkingConfig = netConf,
p2pReliability = true,
)
@ -246,8 +250,9 @@ suite "Waku API - Send":
let sendResult = await node.send(envelope)
check sendResult.isErr() # Depending on implementation, it might say "not healthy"
check sendResult.error().contains("not healthy")
# TODO: The API is not enforcing a health check before the send,
# so currently this test cannot successfully fail to send.
check sendResult.isOk()
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error

View File

@ -7,4 +7,5 @@ import
./test_wakunode_peer_exchange,
./test_wakunode_store,
./test_wakunode_legacy_store,
./test_wakunode_peer_manager
./test_wakunode_peer_manager,
./test_wakunode_health_monitor

View File

@ -0,0 +1,301 @@
{.used.}
import
std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results
import
waku/[
waku_core,
common/waku_protocol,
node/waku_node,
node/peer_manager,
node/health_monitor/health_status,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/node_health_monitor,
node/kernel_api/relay,
node/kernel_api/store,
node/kernel_api/lightpush,
node/kernel_api/filter,
waku_archive,
]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
const MockDLow = 4 # Mocked GossipSub DLow value
const TestConnectivityTimeLimit = 3.seconds
proc protoHealthMock(kind: WakuProtocol, health: HealthStatus): ProtocolHealth =
var ph = ProtocolHealth.init(kind)
if health == HealthStatus.READY:
return ph.ready()
else:
return ph.notReady("mock")
suite "Health Monitor - health state calculation":
test "Disconnected, zero peers":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.NOT_READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(FilterClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(LightpushClientProtocol, HealthStatus.NOT_READY),
]
let strength = initTable[WakuProtocol, int]()
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, weak relay":
let weakCount = MockDLow - 1
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Partially connected since relay connectivity is weak (> 0, but < dLow)
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay":
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Fully connected since relay connectivity is ideal (>= dLow)
check state == ConnectionStatus.Connected
test "Connected, robust edge":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.NOT_MOUNTED),
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = HealthyThreshold
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Disconnected, edge missing store":
let protocols =
@[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, edge meets minimum failover requirement":
let weakCount = max(1, HealthyThreshold - 1)
let protocols =
@[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = weakCount
strength[FilterClientProtocol] = weakCount
strength[StoreClientProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay ignores store server":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Connected, robust relay ignores store client":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
suite "Health Monitor - events":
asyncTest "Core (relay) health update":
let
nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} =
discard
nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node A failed to subscribe"
)
nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node B failed to subscribe"
)
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
await monitorA.stopHealthMonitor()
await nodeA.stop()
asyncTest "Edge (light client) health update":
let
nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor()
await nodeA.stop()

View File

@ -1207,3 +1207,233 @@ procSuite "Peer Manager":
r = node1.peerManager.selectPeer(WakuPeerExchangeCodec)
assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec"
asyncTest "selectPeer() filters peers by shard using ENR":
## Given: A peer manager with 3 peers having different shards in their ENRs
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
# Create 3 nodes with different shards
let nodes =
@[
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
),
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId1],
),
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
),
]
await allFutures(nodes.mapIt(it.start()))
for node in nodes:
discard await node.mountRelay()
# Get peer infos with ENRs
let peerInfos = collect:
for node in nodes:
var peerInfo = node.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(node.enr)
peerInfo
# Add all peers to node 0's peer manager and peerstore
for i in 1 .. 2:
nodes[0].peerManager.addPeer(peerInfos[i])
nodes[0].peerManager.switch.peerStore[AddressBook][peerInfos[i].peerId] =
peerInfos[i].addrs
nodes[0].peerManager.switch.peerStore[ProtoBook][peerInfos[i].peerId] =
@[WakuRelayCodec]
## When: We select a peer for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer0 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Only peers supporting shard 0 are considered (nodes 2, not node 1)
check:
selectedPeer0.isSome()
selectedPeer0.get().peerId != peerInfos[1].peerId # node1 has shard 1
selectedPeer0.get().peerId == peerInfos[2].peerId # node2 has shard 0
## When: We select a peer for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Only peer with shard 1 is selected
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfos[1].peerId # node1 has shard 1
await allFutures(nodes.mapIt(it.stop()))
asyncTest "selectPeer() filters peers by shard using shards field":
## Given: A peer manager with peers having shards in RemotePeerInfo (no ENR)
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng()).withMplex().withNoise().build(),
storage = nil,
)
# Create peer infos with shards field populated (simulating metadata exchange)
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
let peers = toSeq(1 .. 3)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk())
.mapIt(it.value)
require:
peers.len == 3
# Manually populate the shards field (ENR is not available)
var peerInfos: seq[RemotePeerInfo] = @[]
for i, peer in peers:
var peerInfo = RemotePeerInfo.init(peer.peerId, peer.addrs)
# Peer 0 and 2 have shard 0, peer 1 has shard 1
peerInfo.shards =
if i == 1:
@[shardId1]
else:
@[shardId0]
# Note: ENR is intentionally left as none
peerInfos.add(peerInfo)
# Add peers to peerstore
for peerInfo in peerInfos:
pm.switch.peerStore[AddressBook][peerInfo.peerId] = peerInfo.addrs
pm.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
# simulate metadata exchange by setting shards field in peerstore
pm.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards)
## When: We select a peer for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer0 = pm.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Peers with shard 0 in shards field are selected
check:
selectedPeer0.isSome()
selectedPeer0.get().peerId in [peerInfos[0].peerId, peerInfos[2].peerId]
## When: We select a peer for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = pm.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Peer with shard 1 in shards field is selected
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfos[1].peerId
asyncTest "selectPeer() handles invalid pubsub topic gracefully":
## Given: A peer manager with valid peers
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = 0,
subscribeShards = @[0'u16],
)
await node.start()
# Add a peer
let peer =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await peer.start()
discard await peer.mountRelay()
var peerInfo = peer.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(peer.enr)
node.peerManager.addPeer(peerInfo)
node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
## When: selectPeer is called with malformed pubsub topic
let invalidTopics =
@[
some(PubsubTopic("invalid-topic")),
some(PubsubTopic("/waku/2/invalid")),
some(PubsubTopic("/waku/2/rs/abc/0")), # non-numeric cluster
some(PubsubTopic("")), # empty topic
]
## Then: Returns none(RemotePeerInfo) without crashing
for invalidTopic in invalidTopics:
let result = node.peerManager.selectPeer(WakuRelayCodec, invalidTopic)
check:
result.isNone()
await allFutures(node.stop(), peer.stop())
asyncTest "selectPeer() prioritizes ENR over shards field":
## Given: A peer with both ENR and shards field populated
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
)
await node.start()
discard await node.mountRelay()
# Create peer with ENR containing shard 0
let peer = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
)
await peer.start()
discard await peer.mountRelay()
# Create peer info with ENR (shard 0) but set shards field to shard 1
var peerInfo = peer.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(peer.enr) # ENR has shard 0
peerInfo.shards = @[shardId1] # shards field has shard 1
node.peerManager.addPeer(peerInfo)
node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
# simulate metadata exchange by setting shards field in peerstore
node.peerManager.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards)
## When: We select for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer = node.peerManager.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Peer is selected because ENR (shard 0) takes precedence
check:
selectedPeer.isSome()
selectedPeer.get().peerId == peerInfo.peerId
## When: We select for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = node.peerManager.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Peer is still selected because shards field is checked as fallback
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfo.peerId
await allFutures(node.stop(), peer.stop())

View File

@ -44,8 +44,7 @@ suite "Waku Keepalive":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
let healthMonitor = NodeHealthMonitor()
healthMonitor.setNodeToHealthMonitor(node1)
let healthMonitor = NodeHealthMonitor.new(node1)
healthMonitor.startKeepalive(2.seconds).isOkOr:
assert false, "Failed to start keepalive"

View File

@ -13,14 +13,15 @@ import
eth/keys,
eth/p2p/discoveryv5/enr
import
waku/
[
waku_node,
waku_core/topics,
node/peer_manager,
discovery/waku_discv5,
waku_metadata,
],
waku/[
waku_node,
waku_core/topics,
waku_core,
node/peer_manager,
discovery/waku_discv5,
waku_metadata,
waku_relay/protocol,
],
./testlib/wakucore,
./testlib/wakunode
@ -41,26 +42,86 @@ procSuite "Waku Metadata Protocol":
clusterId = clusterId,
)
# Mount metadata protocol on both nodes before starting
discard node1.mountMetadata(clusterId, @[])
discard node2.mountMetadata(clusterId, @[])
# Mount relay so metadata can track subscriptions
discard await node1.mountRelay()
discard await node2.mountRelay()
# Start nodes
await allFutures([node1.start(), node2.start()])
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7"))
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6"))
# Subscribe to topics on node1 - relay will track these and metadata will report them
let noOpHandler: WakuRelayHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async.} =
discard
node1.wakuRelay.subscribe("/waku/2/rs/10/7", noOpHandler)
node1.wakuRelay.subscribe("/waku/2/rs/10/6", noOpHandler)
# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
require:
connOpt.isSome
connOpt.isSome()
# Request metadata
let response1 = await node2.wakuMetadata.request(connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response1.isOk()
check:
response1.get().clusterId.get() == clusterId
response1.get().shards == @[uint32(6), uint32(7)]
await allFutures([node1.stop(), node2.stop()])
asyncTest "Metadata reports configured shards before relay subscription":
## Given: Node with configured shards but no relay subscriptions yet
let
clusterId = 10.uint16
configuredShards = @[uint16(0), uint16(1)]
let node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = configuredShards,
)
let node2 = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId
)
# Mount metadata with configured shards on node1
discard node1.mountMetadata(clusterId, configuredShards)
# Mount metadata on node2 so it can make requests
discard node2.mountMetadata(clusterId, @[])
# Start nodes (relay is NOT mounted yet on node1)
await allFutures([node1.start(), node2.start()])
## When: Node2 requests metadata from Node1 before relay is active
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
require:
connOpt.isSome
let response = await node2.wakuMetadata.request(connOpt.get())
## Then: Response contains configured shards even without relay subscriptions
require:
response.isOk()
check:
response.get().clusterId.get() == clusterId
response.get().shards == @[uint32(0), uint32(1)]
await allFutures([node1.stop(), node2.stop()])

View File

@ -10,6 +10,7 @@ import
import
waku/waku_core/peers,
waku/waku_core/codecs,
waku/waku_core,
waku/node/waku_node,
waku/node/peer_manager/peer_manager,
waku/waku_rendezvous/protocol,
@ -81,3 +82,65 @@ procSuite "Waku Rendezvous":
records.len == 1
records[0].peerId == peerInfo1.peerId
#records[0].mixPubKey == $node1.wakuMix.pubKey
asyncTest "Rendezvous advertises configured shards before relay is active":
## Given: A node with configured shards but no relay subscriptions yet
let
clusterId = 10.uint16
configuredShards = @[RelayShard(clusterId: clusterId, shardId: 0)]
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[0'u16],
)
## When: Node mounts rendezvous with configured shards (before relay)
await node.mountRendezvous(clusterId, configuredShards)
await node.start()
## Then: The rendezvous protocol should be mounted successfully
check:
node.wakuRendezvous != nil
# Verify that the protocol is running without errors
# (shards are used internally by the getShardsGetter closure)
let namespace = computeMixNamespace(clusterId)
check:
namespace.len > 0
await node.stop()
asyncTest "Rendezvous uses configured shards when relay not mounted":
## Given: A light client node with no relay protocol
let
clusterId = 10.uint16
configuredShards =
@[
RelayShard(clusterId: clusterId, shardId: 0),
RelayShard(clusterId: clusterId, shardId: 1),
]
let lightClient = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId
)
## When: Node mounts rendezvous with configured shards (no relay mounted)
await lightClient.mountRendezvous(clusterId, configuredShards)
await lightClient.start()
## Then: Rendezvous should be mounted successfully without relay
check:
lightClient.wakuRendezvous != nil
lightClient.wakuRelay == nil # Verify relay is not mounted
# Verify the protocol is working (doesn't fail immediately)
# advertiseAll requires peers,so we just check the protocol is initialized
await sleepAsync(100.milliseconds)
check:
lightClient.wakuRendezvous != nil
await lightClient.stop()

View File

@ -27,7 +27,7 @@ import
# TODO: migrate to usage of a test cluster conf
proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
var builder = WakuConfBuilder.init()
builder.withP2pTcpPort(Port(60000))
builder.withP2pTcpPort(Port(0))
builder.withP2pListenAddress(parseIpAddress("0.0.0.0"))
builder.restServerConf.withListenAddress(parseIpAddress("127.0.0.1"))
builder.withDnsAddrsNameServers(
@ -80,7 +80,7 @@ proc newTestWakuNode*(
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
let extPort =
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone():
some(Port(60000))
some(Port(0))
else:
extPort

View File

@ -35,7 +35,7 @@ suite "Waku Message - Deterministic hashing":
byteutils.toHex(message.payload) == "010203045445535405060708"
byteutils.toHex(message.meta) == ""
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
byteutils.toHex(messageHash) ==
"cccab07fed94181c83937c8ca8340c9108492b7ede354a6d95421ad34141fd37"
test "digest computation - meta field (12 bytes)":
@ -69,7 +69,7 @@ suite "Waku Message - Deterministic hashing":
byteutils.toHex(message.payload) == "010203045445535405060708"
byteutils.toHex(message.meta) == "73757065722d736563726574"
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
byteutils.toHex(messageHash) ==
"b9b4852f9d8c489846e8bfc6c5ca6a1a8d460a40d28832a966e029eb39619199"
test "digest computation - meta field (64 bytes)":
@ -104,7 +104,7 @@ suite "Waku Message - Deterministic hashing":
byteutils.toHex(message.meta) ==
"000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f"
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
byteutils.toHex(messageHash) ==
"653460d04f66c5b11814d235152f4f246e6f03ef80a305a825913636fbafd0ba"
test "digest computation - zero length payload":
@ -132,7 +132,7 @@ suite "Waku Message - Deterministic hashing":
## Then
check:
messageHash.toHex() ==
byteutils.toHex(messageHash) ==
"0f6448cc23b2db6c696aa6ab4b693eff4cf3549ff346fe1dbeb281697396a09f"
test "waku message - check meta size is enforced":

View File

@ -11,15 +11,15 @@ import
from std/times import epochTime
import
waku/
[
waku_relay,
node/waku_node,
node/peer_manager,
waku_core,
waku_node,
waku_rln_relay,
],
waku/[
waku_relay,
node/waku_node,
node/peer_manager,
waku_core,
waku_node,
waku_rln_relay,
common/broker/broker_context,
],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, futures]

View File

@ -386,7 +386,7 @@ procSuite "WakuNode - Store":
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore((3, 500.millis))
waitFor server.mountStore((3, 200.millis))
client.mountStoreClient()
@ -413,11 +413,11 @@ procSuite "WakuNode - Store":
for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(5.millis)
waitFor sleepAsync(1.millis)
waitFor failsProc()
waitFor sleepAsync(500.millis)
waitFor sleepAsync(200.millis)
for count in 0 ..< 3:
waitFor successProc()

View File

@ -10,6 +10,7 @@ import
libp2p/crypto/crypto
import
waku/[
common/waku_protocol,
waku_node,
node/waku_node as waku_node2,
# TODO: Remove after moving `git_version` to the app code.
@ -50,33 +51,22 @@ suite "Waku v2 REST API - health":
asyncTest "Get node health info - GET /health":
# Given
let node = testWakuNode()
let healthMonitor = NodeHealthMonitor()
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
healthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.httpServer.address.port # update with bound port for client use
let healthMonitor = NodeHealthMonitor.new(node)
installHealthApiHandler(restServer.router, healthMonitor)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# When
var response = await client.healthCheck()
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data ==
HealthReport(nodeHealth: HealthStatus.INITIALIZING, protocolsHealth: @[])
# now kick in rln (currently the only check for health)
# kick in rln (currently the only check for health)
await node.mountRlnRelay(
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
)
@ -84,51 +74,44 @@ suite "Waku v2 REST API - health":
node.mountLightPushClient()
await node.mountFilterClient()
healthMonitor.setNodeToHealthMonitor(node)
# We don't have a Waku, so we need to set the overall health to READY here in its behalf
healthMonitor.setOverallHealth(HealthStatus.READY)
# When
response = await client.healthCheck()
var response = await client.healthCheck()
let report = response.data
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.nodeHealth == HealthStatus.READY
response.data.protocolsHealth.len() == 15
response.data.protocolsHealth[0].protocol == "Relay"
response.data.protocolsHealth[0].health == HealthStatus.NOT_READY
response.data.protocolsHealth[0].desc == some("No connected peers")
response.data.protocolsHealth[1].protocol == "Rln Relay"
response.data.protocolsHealth[1].health == HealthStatus.READY
response.data.protocolsHealth[2].protocol == "Lightpush"
response.data.protocolsHealth[2].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[3].protocol == "Legacy Lightpush"
response.data.protocolsHealth[3].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[4].protocol == "Filter"
response.data.protocolsHealth[4].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[5].protocol == "Store"
response.data.protocolsHealth[5].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[6].protocol == "Legacy Store"
response.data.protocolsHealth[6].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[7].protocol == "Peer Exchange"
response.data.protocolsHealth[7].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[8].protocol == "Rendezvous"
response.data.protocolsHealth[8].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[9].protocol == "Mix"
response.data.protocolsHealth[9].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[10].protocol == "Lightpush Client"
response.data.protocolsHealth[10].health == HealthStatus.NOT_READY
response.data.protocolsHealth[10].desc ==
report.nodeHealth == HealthStatus.READY
report.protocolsHealth.len() == 15
report.getHealth(RelayProtocol).health == HealthStatus.NOT_READY
report.getHealth(RelayProtocol).desc == some("No connected peers")
report.getHealth(RlnRelayProtocol).health == HealthStatus.READY
report.getHealth(LightpushProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyLightpushProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(FilterProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(StoreProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyStoreProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(PeerExchangeProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(RendezvousProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(MixProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LightpushClientProtocol).health == HealthStatus.NOT_READY
report.getHealth(LightpushClientProtocol).desc ==
some("No Lightpush service peer available yet")
response.data.protocolsHealth[11].protocol == "Legacy Lightpush Client"
response.data.protocolsHealth[11].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[12].protocol == "Store Client"
response.data.protocolsHealth[12].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[13].protocol == "Legacy Store Client"
response.data.protocolsHealth[13].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[14].protocol == "Filter Client"
response.data.protocolsHealth[14].health == HealthStatus.NOT_READY
response.data.protocolsHealth[14].desc ==
report.getHealth(LegacyLightpushClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(StoreClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyStoreClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(FilterClientProtocol).health == HealthStatus.NOT_READY
report.getHealth(FilterClientProtocol).desc ==
some("No Filter service peer available yet")
await restServer.stop()

2
vendor/nim-dnsdisc vendored

@ -1 +1 @@
Subproject commit b71d029f4da4ec56974d54c04518bada00e1b623
Subproject commit 203abd2b3e758e0ea3ae325769b20a7e1bcd1010

@ -1 +1 @@
Subproject commit c3ac3f639ed1d62f59d3077d376a29c63ac9750c
Subproject commit ce27581a3e881f782f482cb66dc5b07a02bd615e

@ -1 +1 @@
Subproject commit 79cbab1460f4c0cdde2084589d017c43a3d7b4f1
Subproject commit c53852d9e24205b6363bba517fa8ee7bde823691

@ -1 +1 @@
Subproject commit b65fd6a7e64c864dabe40e7dfd6c7d07db0014ac
Subproject commit c343b0e243d9e17e2c40f3a8a24340f7c4a71d44

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb
Subproject commit ca48c3718246bb411ff0e354a70cb82d9a28de0d

2
vendor/nim-lsquic vendored

@ -1 +1 @@
Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b
Subproject commit 4fb03ee7bfb39aecb3316889fdcb60bec3d0936f

2
vendor/nim-metrics vendored

@ -1 +1 @@
Subproject commit ecf64c6078d1276d3b7d9b3d931fbdb70004db11
Subproject commit 11d0cddfb0e711aa2a8c75d1892ae24a64c299fc

2
vendor/nim-presto vendored

@ -1 +1 @@
Subproject commit 92b1c7ff141e6920e1f8a98a14c35c1fa098e3be
Subproject commit d66043dd7ede146442e6c39720c76a20bde5225f

@ -1 +1 @@
Subproject commit 6f525d5447d97256750ca7856faead03e562ed20
Subproject commit b0f2fa32960ea532a184394b0f27be37bd80248b

@ -1 +1 @@
Subproject commit bdf01cf4236fb40788f0733466cdf6708783cbac
Subproject commit 89ba51f557414d3a3e17ab3df8270e1bdaa3ca2a

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit e5740014961438610d336cd81706582dbf2c96f0
Subproject commit b66168735d6f3841c5239c3169d3fe5fe98b1257

@ -1 +1 @@
Subproject commit 94d68e796c045d5b37cabc6be32d7bfa168f8857
Subproject commit e4d37dc1652d5c63afb89907efb5a5e812261797

@ -1 +1 @@
Subproject commit fea85b27f0badcf617033ca1bc05444b5fd8aa7a
Subproject commit b5b387e6fb2a7cc75d54a269b07cc6218361bd46

@ -1 +1 @@
Subproject commit 8b51e99b4a57fcfb31689230e75595f024543024
Subproject commit 26f2ef3ae0ec72a2a75bfe557e02e88f6a31c189

2
vendor/nim-websock vendored

@ -1 +1 @@
Subproject commit ebe308a79a7b440a11dfbe74f352be86a3883508
Subproject commit 35ae76f1559e835c80f9c1a3943bf995d3dd9eb5

@ -1 +1 @@
Subproject commit 8a338f354481e8a3f3d64a72e38fad4c62e32dcd
Subproject commit d9906ef40f1e113fcf51de4ad27c61aa45375c2d

2
vendor/zerokit vendored

@ -1 +1 @@
Subproject commit 70c79fbc989d4f87d9352b2f4bddcb60ebe55b19
Subproject commit a4bb3feb5054e6fd24827adf204493e6e173437b

View File

@ -1,7 +1,7 @@
import chronicles, chronos, results
import chronicles, chronos, results, std/strutils
import waku/factory/waku
import waku/[requests/health_request, waku_core, waku_node]
import waku/[requests/health_requests, waku_core, waku_node]
import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_service
import libp2p/peerid
@ -26,16 +26,8 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
if w.isNil():
return err("Waku node is not initialized")
# check if health is satisfactory
# If Node is not healthy, return err("Waku node is not healthy")
let healthStatus = RequestNodeHealth.request(w.brokerCtx)
if healthStatus.isErr():
warn "Failed to get Waku node health status: ", error = healthStatus.error
# Let's suppose the node is hesalthy enough, go ahead
else:
if healthStatus.get().healthStatus == NodeHealth.Unhealthy:
return err("Waku node is not healthy, has got no connections.")
# TODO: Conciliate request-bouncing health checks here with unit testing.
# (For now, better to just allow all sends and rely on retries.)
return ok()

View File

@ -14,10 +14,10 @@ type
RequestId* = distinct string
NodeHealth* {.pure.} = enum
Healthy
MinimallyHealthy
Unhealthy
ConnectionStatus* {.pure.} = enum
Disconnected
PartiallyConnected
Connected
proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T =
## Generate a new RequestId using the provided RNG.

View File

@ -48,8 +48,8 @@ proc check(db: DbConn): Result[void, string] =
return err("exception in check: " & getCurrentExceptionMsg())
if message.len > 0:
let truncatedErr = message[0 .. 80]
## libpq sometimes gives extremely long error messages
let truncatedErr = message[0 ..< min(80, message.len)]
error "postgres check issue. see truncated db error.", error = truncatedErr
return err(truncatedErr)
return ok()

View File

@ -0,0 +1,24 @@
{.push raises: [].}
type WakuProtocol* {.pure.} = enum
RelayProtocol = "Relay"
RlnRelayProtocol = "Rln Relay"
StoreProtocol = "Store"
LegacyStoreProtocol = "Legacy Store"
FilterProtocol = "Filter"
LightpushProtocol = "Lightpush"
LegacyLightpushProtocol = "Legacy Lightpush"
PeerExchangeProtocol = "Peer Exchange"
RendezvousProtocol = "Rendezvous"
MixProtocol = "Mix"
StoreClientProtocol = "Store Client"
LegacyStoreClientProtocol = "Legacy Store Client"
FilterClientProtocol = "Filter Client"
LightpushClientProtocol = "Lightpush Client"
LegacyLightpushClientProtocol = "Legacy Lightpush Client"
const
RelayProtocols* = {RelayProtocol}
StoreClientProtocols* = {StoreClientProtocol, LegacyStoreClientProtocol}
LightpushClientProtocols* = {LightpushClientProtocol, LegacyLightpushClientProtocol}
FilterClientProtocols* = {FilterClientProtocol}

View File

@ -1,3 +1,3 @@
import ./[message_events, delivery_events]
import ./[message_events, delivery_events, health_events, peer_events]
export message_events, delivery_events
export message_events, delivery_events, health_events, peer_events

View File

@ -0,0 +1,27 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health]
import waku/waku_core/topics
export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe
# from/to content topic is provided in the new API (so we know which
# content topics are of interest to the application)
EventBroker:
type EventContentTopicHealthChange* = object
contentTopic*: ContentTopic
health*: TopicHealth
# Notify health changes to a shard (pubsub topic)
EventBroker:
type EventShardTopicHealthChange* = object
topic*: PubsubTopic
health*: TopicHealth

View File

@ -0,0 +1,13 @@
import waku/common/broker/event_broker
import libp2p/switch
type WakuPeerEventKind* {.pure.} = enum
EventConnected
EventDisconnected
EventIdentified
EventMetadataUpdated
EventBroker:
type EventWakuPeer* = object
peerId*: PeerId
kind*: WakuPeerEventKind

View File

@ -1,6 +1,7 @@
import ../waku_relay, ../node/peer_manager
import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status
type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler
connectionChangeHandler*: ConnectionChangeHandler
connectionStatusChangeHandler*: ConnectionStatusChangeHandler

View File

@ -15,7 +15,8 @@ import
../waku_node,
../node/peer_manager,
../common/rate_limit/setting,
../common/utils/parse_size_units
../common/utils/parse_size_units,
../common/broker/broker_context
type
WakuNodeBuilder* = object # General

View File

@ -337,7 +337,7 @@ proc setupProtocols(
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
if conf.rendezvous:
await node.mountRendezvous(conf.clusterId)
await node.mountRendezvous(conf.clusterId, shards)
await node.mountRendezvousClient(conf.clusterId)
# Keepalive mounted on all nodes

View File

@ -17,35 +17,36 @@ import
eth/p2p/discoveryv5/enr,
presto,
metrics,
metrics/chronos_httpserver
import
../common/logging,
../waku_core,
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/waku_metrics,
../node/delivery_service/delivery_service,
../rest_api/message_cache,
../rest_api/endpoint/server,
../rest_api/endpoint/builder as rest_server_builder,
../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc,
../discovery/waku_discv5,
../discovery/autonat_service,
../waku_enr/sharding,
../waku_rln_relay,
../waku_store,
../waku_filter_v2,
../factory/node_factory,
../factory/internal_config,
../factory/app_callbacks,
../waku_enr/multiaddr,
./waku_conf,
../common/broker/broker_context,
../requests/health_request,
../api/types
metrics/chronos_httpserver,
waku/[
waku_core,
waku_node,
waku_archive,
waku_rln_relay,
waku_store,
waku_filter_v2,
waku_relay/protocol,
waku_enr/sharding,
waku_enr/multiaddr,
api/types,
common/logging,
common/broker/broker_context,
node/peer_manager,
node/health_monitor,
node/waku_metrics,
node/delivery_service/delivery_service,
rest_api/message_cache,
rest_api/endpoint/server,
rest_api/endpoint/builder as rest_server_builder,
discovery/waku_dnsdisc,
discovery/waku_discv5,
discovery/autonat_service,
requests/health_requests,
factory/node_factory,
factory/internal_config,
factory/app_callbacks,
],
./waku_conf
logScope:
topics = "wakunode waku"
@ -118,7 +119,10 @@ proc newCircuitRelay(isRelayClient: bool): Relay =
return Relay.new()
proc setupAppCallbacks(
node: WakuNode, conf: WakuConf, appCallbacks: AppCallbacks
node: WakuNode,
conf: WakuConf,
appCallbacks: AppCallbacks,
healthMonitor: NodeHealthMonitor,
): Result[void, string] =
if appCallbacks.isNil():
info "No external callbacks to be set"
@ -159,6 +163,13 @@ proc setupAppCallbacks(
err("Cannot configure connectionChangeHandler callback with empty peer manager")
node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler
if not appCallbacks.connectionStatusChangeHandler.isNil():
if healthMonitor.isNil():
return
err("Cannot configure connectionStatusChangeHandler with empty health monitor")
healthMonitor.onConnectionStatusChange = appCallbacks.connectionStatusChangeHandler
return ok()
proc new*(
@ -172,7 +183,13 @@ proc new*(
?wakuConf.validate()
wakuConf.logConf()
let healthMonitor = NodeHealthMonitor.new(wakuConf.dnsAddrsNameServers)
let relay = newCircuitRelay(wakuConf.circuitRelayClient)
let node = (await setupNode(wakuConf, rng, relay)).valueOr:
error "Failed setting up node", error = $error
return err("Failed setting up node: " & $error)
let healthMonitor = NodeHealthMonitor.new(node, wakuConf.dnsAddrsNameServers)
let restServer: WakuRestServerRef =
if wakuConf.restServerConf.isSome():
@ -186,19 +203,7 @@ proc new*(
else:
nil
var relay = newCircuitRelay(wakuConf.circuitRelayClient)
let node = (await setupNode(wakuConf, rng, relay)).valueOr:
error "Failed setting up node", error = $error
return err("Failed setting up node: " & $error)
healthMonitor.setNodeToHealthMonitor(node)
healthMonitor.onlineMonitor.setPeerStoreToOnlineMonitor(node.switch.peerStore)
healthMonitor.onlineMonitor.addOnlineStateObserver(
node.peerManager.getOnlineStateObserver()
)
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
node.setupAppCallbacks(wakuConf, appCallbacks, healthMonitor).isOkOr:
error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error)
@ -415,60 +420,48 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
waku[].healthMonitor.startHealthMonitor().isOkOr:
return err("failed to start health monitor: " & $error)
## Setup RequestNodeHealth provider
## Setup RequestConnectionStatus provider
RequestNodeHealth.setProvider(
RequestConnectionStatus.setProvider(
globalBrokerContext(),
proc(): Result[RequestNodeHealth, string] =
let healthReportFut = waku[].healthMonitor.getNodeHealthReport()
if not healthReportFut.completed():
return err("Health report not available")
proc(): Result[RequestConnectionStatus, string] =
try:
let healthReport = healthReportFut.read()
# Check if Relay or Lightpush Client is ready (MinimallyHealthy condition)
var relayReady = false
var lightpushClientReady = false
var storeClientReady = false
var filterClientReady = false
for protocolHealth in healthReport.protocolsHealth:
if protocolHealth.protocol == "Relay" and
protocolHealth.health == HealthStatus.READY:
relayReady = true
elif protocolHealth.protocol == "Lightpush Client" and
protocolHealth.health == HealthStatus.READY:
lightpushClientReady = true
elif protocolHealth.protocol == "Store Client" and
protocolHealth.health == HealthStatus.READY:
storeClientReady = true
elif protocolHealth.protocol == "Filter Client" and
protocolHealth.health == HealthStatus.READY:
filterClientReady = true
# Determine node health based on protocol states
let isMinimallyHealthy = relayReady or lightpushClientReady
let nodeHealth =
if isMinimallyHealthy and storeClientReady and filterClientReady:
NodeHealth.Healthy
elif isMinimallyHealthy:
NodeHealth.MinimallyHealthy
else:
NodeHealth.Unhealthy
debug "Providing health report",
nodeHealth = $nodeHealth,
relayReady = relayReady,
lightpushClientReady = lightpushClientReady,
storeClientReady = storeClientReady,
filterClientReady = filterClientReady,
details = $(healthReport)
return ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError as exc:
err("Failed to read health report: " & exc.msg),
let healthReport = waku[].healthMonitor.getSyncNodeHealthReport()
return
ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus))
except CatchableError:
err("Failed to read health report: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestNodeHealth provider", error = error
error "Failed to set RequestConnectionStatus provider", error = error
## Setup RequestProtocolHealth provider
RequestProtocolHealth.setProvider(
globalBrokerContext(),
proc(
protocol: WakuProtocol
): Future[Result[RequestProtocolHealth, string]] {.async.} =
try:
let protocolHealthStatus =
await waku[].healthMonitor.getProtocolHealthInfo(protocol)
return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus))
except CatchableError:
return err("Failed to get protocol health: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestProtocolHealth provider", error = error
## Setup RequestHealthReport provider (The lost child)
RequestHealthReport.setProvider(
globalBrokerContext(),
proc(): Future[Result[RequestHealthReport, string]] {.async.} =
try:
let report = await waku[].healthMonitor.getNodeHealthReport()
return ok(RequestHealthReport(healthReport: report))
except CatchableError:
return err("Failed to get health report: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestHealthReport provider", error = error
if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport(
@ -527,8 +520,8 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.healthMonitor.isNil():
await waku.healthMonitor.stopHealthMonitor()
## Clear RequestNodeHealth provider
RequestNodeHealth.clearProvider(waku.brokerCtx)
## Clear RequestConnectionStatus provider
RequestConnectionStatus.clearProvider(waku.brokerCtx)
if not waku.restServer.isNil():
await waku.restServer.stop()

View File

@ -1,7 +1,7 @@
import std/options
import chronos, chronicles
import waku/[waku_core], waku/waku_lightpush/[common, rpc]
import waku/requests/health_request
import waku/requests/health_requests
import waku/common/broker/broker_context
import waku/api/types
import ./[delivery_task, send_processor]
@ -32,7 +32,7 @@ proc new*(
)
proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
let healthReport = RequestShardTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
error "isTopicHealthy: failed to get health report", topic = topic, error = error
return false
@ -70,7 +70,9 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} =
if noOfPublishedPeers > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash.to0xHex(), noOfPeers = noOfPublishedPeers
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
noOfPeers = noOfPublishedPeers
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
else:

View File

@ -1,4 +1,9 @@
import
health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status]
health_monitor/[
node_health_monitor, protocol_health, online_monitor, health_status,
connection_status, health_report,
]
export node_health_monitor, protocol_health, online_monitor, health_status
export
node_health_monitor, protocol_health, online_monitor, health_status,
connection_status, health_report

View File

@ -0,0 +1,15 @@
import chronos, results, std/strutils, ../../api/types
export ConnectionStatus
proc init*(
t: typedesc[ConnectionStatus], strRep: string
): Result[ConnectionStatus, string] =
try:
let status = parseEnum[ConnectionStatus](strRep)
return ok(status)
except ValueError:
return err("Invalid ConnectionStatus string representation: " & strRep)
type ConnectionStatusChangeHandler* =
proc(status: ConnectionStatus): Future[void] {.gcsafe, raises: [Defect].}

View File

@ -0,0 +1,10 @@
{.push raises: [].}
import ./health_status, ./connection_status, ./protocol_health
type HealthReport* = object
## Rest API type returned for /health endpoint
##
nodeHealth*: HealthStatus # legacy "READY" health indicator
connectionStatus*: ConnectionStatus # new "Connected" health indicator
protocolsHealth*: seq[ProtocolHealth]

View File

@ -1,65 +1,89 @@
{.push raises: [].}
import
std/[options, sets, random, sequtils],
std/[options, sets, random, sequtils, json, strutils, tables],
chronos,
chronicles,
libp2p/protocols/rendezvous
import
../waku_node,
../kernel_api,
../../waku_rln_relay,
../../waku_relay,
../peer_manager,
./online_monitor,
./health_status,
./protocol_health
libp2p/protocols/rendezvous,
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
waku/[
waku_relay,
waku_rln_relay,
api/types,
events/health_events,
events/peer_events,
node/waku_node,
node/peer_manager,
node/kernel_api,
node/health_monitor/online_monitor,
node/health_monitor/health_status,
node/health_monitor/health_report,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
]
## This module is aimed to check the state of the "self" Waku Node
# randomize initializes sdt/random's random number generator
# if not called, the outcome of randomization procedures will be the same in every run
randomize()
random.randomize()
type
HealthReport* = object
nodeHealth*: HealthStatus
protocolsHealth*: seq[ProtocolHealth]
const HealthyThreshold* = 2
## minimum peers required for all services for a Connected status, excluding Relay
NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
onlineMonitor*: OnlineMonitor
keepAliveFut: Future[void]
type NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
onlineMonitor*: OnlineMonitor
keepAliveFut: Future[void]
healthLoopFut: Future[void]
healthUpdateEvent: AsyncEvent
connectionStatus: ConnectionStatus
onConnectionStatusChange*: ConnectionStatusChangeHandler
cachedProtocols: seq[ProtocolHealth]
## state of each protocol to report.
## calculated on last event that can change any protocol's state so fetching a report is fast.
strength: Table[WakuProtocol, int]
## latest known connectivity strength (e.g. connected peer count) metric for each protocol.
## if it doesn't make sense for the protocol in question, this is set to zero.
relayObserver: PubSubObserver
peerEventListener: EventWakuPeerListener
template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped =
if node.isNil():
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
return p.notMounted()
func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth =
for h in report.protocolsHealth:
if h.protocol == $kind:
return h
# Shouldn't happen, but if it does, then assume protocol is not mounted
return ProtocolHealth.init(kind)
proc countCapablePeers(hm: NodeHealthMonitor, codec: string): int =
if isNil(hm.node.peerManager):
return 0
return hm.node.peerManager.getCapablePeersCount(codec)
proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Relay")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.RelayProtocol)
if hm.node.wakuRelay == nil:
if isNil(hm.node.wakuRelay):
hm.strength[WakuProtocol.RelayProtocol] = 0
return p.notMounted()
let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
hm.strength[WakuProtocol.RelayProtocol] = 0
return p.notMounted()
if relayPeers.len() == 0:
let count = relayPeers.len
hm.strength[WakuProtocol.RelayProtocol] = count
if count == 0:
return p.notReady("No connected peers")
return p.ready()
proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} =
var p = ProtocolHealth.init("Rln Relay")
if hm.node.isNil():
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
return p.notMounted()
if hm.node.wakuRlnRelay.isNil():
var p = ProtocolHealth.init(WakuProtocol.RlnRelayProtocol)
if isNil(hm.node.wakuRlnRelay):
return p.notMounted()
const FutIsReadyTimout = 5.seconds
@ -82,131 +106,144 @@ proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.}
proc getLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.LightpushProtocol)
if hm.node.wakuLightPush == nil:
if isNil(hm.node.wakuLightPush):
hm.strength[WakuProtocol.LightpushProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLightPushCodec)
hm.strength[WakuProtocol.LightpushProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLightpushClient == nil:
return p.notMounted()
let selfServiceAvailable =
hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY
let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome()
if selfServiceAvailable or servicePeerAvailable:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushProtocol)
if hm.node.wakuLegacyLightPush == nil:
if isNil(hm.node.wakuLegacyLightPush):
hm.strength[WakuProtocol.LegacyLightpushProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec)
hm.strength[WakuProtocol.LegacyLightpushProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLegacyLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyLightpushClient == nil:
return p.notMounted()
if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth =
var p = ProtocolHealth.init("Filter")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.FilterProtocol)
if hm.node.wakuFilter == nil:
if isNil(hm.node.wakuFilter):
hm.strength[WakuProtocol.FilterProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec)
hm.strength[WakuProtocol.FilterProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
proc getFilterClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Filter Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuFilterClient == nil:
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.StoreProtocol)
if hm.node.wakuStore == nil:
if isNil(hm.node.wakuStore):
hm.strength[WakuProtocol.StoreProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuStoreCodec)
hm.strength[WakuProtocol.StoreProtocol] = peerCount
return p.ready()
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store Client")
checkWakuNodeNotNil(hm.node, p)
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LegacyStoreProtocol)
if hm.node.wakuStoreClient == nil:
if isNil(hm.node.wakuLegacyStore):
hm.strength[WakuProtocol.LegacyStoreProtocol] = 0
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil:
let peerCount = hm.countCapablePeers(WakuLegacyStoreCodec)
hm.strength[WakuProtocol.LegacyStoreProtocol] = peerCount
return p.ready()
proc getLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LightpushClientProtocol)
if isNil(hm.node.wakuLightpushClient):
hm.strength[WakuProtocol.LightpushClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLightPushCodec)
hm.strength[WakuProtocol.LightpushClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushClientProtocol)
if isNil(hm.node.wakuLegacyLightpushClient):
hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec)
hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.FilterClientProtocol)
if isNil(hm.node.wakuFilterClient):
hm.strength[WakuProtocol.FilterClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec)
hm.strength[WakuProtocol.FilterClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.StoreClientProtocol)
if isNil(hm.node.wakuStoreClient):
hm.strength[WakuProtocol.StoreClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuStoreCodec)
hm.strength[WakuProtocol.StoreClientProtocol] = peerCount
if peerCount > 0 or not isNil(hm.node.wakuStore):
return p.ready()
return p.notReady(
"No Store service peer available yet, neither Store service set up for the node"
)
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyStore == nil:
return p.notMounted()
return p.ready()
proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store Client")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.LegacyStoreClientProtocol)
if hm.node.wakuLegacyStoreClient == nil:
if isNil(hm.node.wakuLegacyStoreClient):
hm.strength[WakuProtocol.LegacyStoreClientProtocol] = 0
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
hm.node.wakuLegacyStore != nil:
let peerCount = countCapablePeers(hm, WakuLegacyStoreCodec)
hm.strength[WakuProtocol.LegacyStoreClientProtocol] = peerCount
if peerCount > 0 or not isNil(hm.node.wakuLegacyStore):
return p.ready()
return p.notReady(
@ -214,41 +251,305 @@ proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
)
proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Peer Exchange")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.PeerExchangeProtocol)
if hm.node.wakuPeerExchange == nil:
if isNil(hm.node.wakuPeerExchange):
hm.strength[WakuProtocol.PeerExchangeProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuPeerExchangeCodec)
hm.strength[WakuProtocol.PeerExchangeProtocol] = peerCount
return p.ready()
proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Rendezvous")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.RendezvousProtocol)
if hm.node.wakuRendezvous == nil:
if isNil(hm.node.wakuRendezvous):
hm.strength[WakuProtocol.RendezvousProtocol] = 0
return p.notMounted()
if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
let peerCount = countCapablePeers(hm, RendezVousCodec)
hm.strength[WakuProtocol.RendezvousProtocol] = peerCount
if peerCount == 0:
return p.notReady("No Rendezvous peers are available yet")
return p.ready()
proc getMixHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Mix")
checkWakuNodeNotNil(hm.node, p)
var p = ProtocolHealth.init(WakuProtocol.MixProtocol)
if hm.node.wakuMix.isNil():
if isNil(hm.node.wakuMix):
return p.notMounted()
return p.ready()
proc getSyncProtocolHealthInfo*(
hm: NodeHealthMonitor, protocol: WakuProtocol
): ProtocolHealth =
## Get ProtocolHealth for a given protocol that can provide it synchronously
##
case protocol
of WakuProtocol.RelayProtocol:
return hm.getRelayHealth()
of WakuProtocol.StoreProtocol:
return hm.getStoreHealth()
of WakuProtocol.LegacyStoreProtocol:
return hm.getLegacyStoreHealth()
of WakuProtocol.FilterProtocol:
return hm.getFilterHealth(hm.getRelayHealth().health)
of WakuProtocol.LightpushProtocol:
return hm.getLightpushHealth(hm.getRelayHealth().health)
of WakuProtocol.LegacyLightpushProtocol:
return hm.getLegacyLightpushHealth(hm.getRelayHealth().health)
of WakuProtocol.PeerExchangeProtocol:
return hm.getPeerExchangeHealth()
of WakuProtocol.RendezvousProtocol:
return hm.getRendezvousHealth()
of WakuProtocol.MixProtocol:
return hm.getMixHealth()
of WakuProtocol.StoreClientProtocol:
return hm.getStoreClientHealth()
of WakuProtocol.LegacyStoreClientProtocol:
return hm.getLegacyStoreClientHealth()
of WakuProtocol.FilterClientProtocol:
return hm.getFilterClientHealth()
of WakuProtocol.LightpushClientProtocol:
return hm.getLightpushClientHealth()
of WakuProtocol.LegacyLightpushClientProtocol:
return hm.getLegacyLightpushClientHealth()
of WakuProtocol.RlnRelayProtocol:
# Could waitFor here but we don't want to block the main thread.
# Could also return a cached value from a previous check.
var p = ProtocolHealth.init(protocol)
return p.notReady("RLN Relay health check is async")
else:
var p = ProtocolHealth.init(protocol)
return p.notMounted()
proc getProtocolHealthInfo*(
hm: NodeHealthMonitor, protocol: WakuProtocol
): Future[ProtocolHealth] {.async.} =
## Get ProtocolHealth for a given protocol
##
case protocol
of WakuProtocol.RlnRelayProtocol:
return await hm.getRlnRelayHealth()
else:
return hm.getSyncProtocolHealthInfo(protocol)
proc getSyncAllProtocolHealthInfo(hm: NodeHealthMonitor): seq[ProtocolHealth] =
## Get ProtocolHealth for the subset of protocols that can provide it synchronously
##
var protocols: seq[ProtocolHealth] = @[]
let relayHealth = hm.getRelayHealth()
protocols.add(relayHealth)
protocols.add(hm.getLightpushHealth(relayHealth.health))
protocols.add(hm.getLegacyLightpushHealth(relayHealth.health))
protocols.add(hm.getFilterHealth(relayHealth.health))
protocols.add(hm.getStoreHealth())
protocols.add(hm.getLegacyStoreHealth())
protocols.add(hm.getPeerExchangeHealth())
protocols.add(hm.getRendezvousHealth())
protocols.add(hm.getMixHealth())
protocols.add(hm.getLightpushClientHealth())
protocols.add(hm.getLegacyLightpushClientHealth())
protocols.add(hm.getStoreClientHealth())
protocols.add(hm.getLegacyStoreClientHealth())
protocols.add(hm.getFilterClientHealth())
return protocols
proc getAllProtocolHealthInfo(
hm: NodeHealthMonitor
): Future[seq[ProtocolHealth]] {.async.} =
## Get ProtocolHealth for all protocols
##
var protocols = hm.getSyncAllProtocolHealthInfo()
let rlnHealth = await hm.getRlnRelayHealth()
protocols.add(rlnHealth)
return protocols
proc calculateConnectionState*(
protocols: seq[ProtocolHealth],
strength: Table[WakuProtocol, int], ## latest connectivity strength (e.g. peer count) for a protocol
dLowOpt: Option[int], ## minimum relay peers for Connected status if in Core (Relay) mode
): ConnectionStatus =
var
relayCount = 0
lightpushCount = 0
filterCount = 0
storeClientCount = 0
for p in protocols:
let kind =
try:
parseEnum[WakuProtocol](p.protocol)
except ValueError:
continue
if p.health != HealthStatus.READY:
continue
let strength = strength.getOrDefault(kind, 0)
if kind in RelayProtocols:
relayCount = max(relayCount, strength)
elif kind in StoreClientProtocols:
storeClientCount = max(storeClientCount, strength)
elif kind in LightpushClientProtocols:
lightpushCount = max(lightpushCount, strength)
elif kind in FilterClientProtocols:
filterCount = max(filterCount, strength)
debug "calculateConnectionState",
protocol = kind,
strength = strength,
relayCount = relayCount,
storeClientCount = storeClientCount,
lightpushCount = lightpushCount,
filterCount = filterCount
# Relay connectivity should be a sufficient check in Core mode.
# "Store peers" are relay peers because incoming messages in
# the relay are input to the store server.
# But if Store server (or client, even) is not mounted as well, this logic assumes
# the user knows what they're doing.
if dLowOpt.isSome():
if relayCount >= dLowOpt.get():
return ConnectionStatus.Connected
if relayCount > 0:
return ConnectionStatus.PartiallyConnected
# No relay connectivity. Relay might not be mounted, or may just have zero peers.
# Fall back to Edge check in any case to be sure.
let canSend = lightpushCount > 0
let canReceive = filterCount > 0
let canStore = storeClientCount > 0
let meetsMinimum = canSend and canReceive and canStore
if not meetsMinimum:
return ConnectionStatus.Disconnected
let isEdgeRobust =
(lightpushCount >= HealthyThreshold) and (filterCount >= HealthyThreshold) and
(storeClientCount >= HealthyThreshold)
if isEdgeRobust:
return ConnectionStatus.Connected
return ConnectionStatus.PartiallyConnected
proc calculateConnectionState*(hm: NodeHealthMonitor): ConnectionStatus =
let dLow =
if isNil(hm.node.wakuRelay):
none(int)
else:
some(hm.node.wakuRelay.parameters.dLow)
return calculateConnectionState(hm.cachedProtocols, hm.strength, dLow)
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
## Get a HealthReport that includes all protocols
##
var report: HealthReport
if hm.nodeHealth == HealthStatus.INITIALIZING or
hm.nodeHealth == HealthStatus.SHUTTING_DOWN:
report.nodeHealth = hm.nodeHealth
report.connectionStatus = ConnectionStatus.Disconnected
return report
if hm.cachedProtocols.len == 0:
hm.cachedProtocols = await hm.getAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
proc getSyncNodeHealthReport*(hm: NodeHealthMonitor): HealthReport =
## Get a HealthReport that includes the subset of protocols that inform health synchronously
##
var report: HealthReport
if hm.nodeHealth == HealthStatus.INITIALIZING or
hm.nodeHealth == HealthStatus.SHUTTING_DOWN:
report.nodeHealth = hm.nodeHealth
report.connectionStatus = ConnectionStatus.Disconnected
return report
if hm.cachedProtocols.len == 0:
hm.cachedProtocols = hm.getSyncAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
proc onRelayMsg(
hm: NodeHealthMonitor, peer: PubSubPeer, msg: var RPCMsg
) {.gcsafe, raises: [].} =
## Inspect Relay events for health-update relevance in Core (Relay) mode.
##
## For Core (Relay) mode, the connectivity health state is mostly determined
## by the relay protocol state (it is the dominant factor), and we know
## that a peer Relay can only affect this Relay's health if there is a
## subscription change or a mesh (GRAFT/PRUNE) change.
##
if msg.subscriptions.len == 0:
if msg.control.isNone():
return
let ctrl = msg.control.get()
if ctrl.graft.len == 0 and ctrl.prune.len == 0:
return
hm.healthUpdateEvent.fire()
proc healthLoop(hm: NodeHealthMonitor) {.async.} =
## Re-evaluate the global health state of the node when notified of a potential change,
## and call back the application if an actual change from the last notified state happened.
info "Health monitor loop start"
while true:
try:
await hm.healthUpdateEvent.wait()
hm.healthUpdateEvent.clear()
hm.cachedProtocols = await hm.getAllProtocolHealthInfo()
let newConnectionStatus = hm.calculateConnectionState()
if newConnectionStatus != hm.connectionStatus:
hm.connectionStatus = newConnectionStatus
EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus)
if not isNil(hm.onConnectionStatusChange):
await hm.onConnectionStatusChange(newConnectionStatus)
except CancelledError:
break
except Exception as e:
error "HealthMonitor: error in update loop", error = e.msg
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
info "Health monitor loop end"
proc selectRandomPeersForKeepalive(
node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int
): Future[seq[PeerId]] {.async.} =
## Select peers for random keepalive, prioritizing mesh peers
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
return selectRandomPeers(outPeers, numRandomPeers)
let meshPeers = node.wakuRelay.getPeersInMesh().valueOr:
@ -382,55 +683,67 @@ proc startKeepalive*(
hm.keepAliveFut = hm.node.keepAliveLoop(randomPeersKeepalive, allPeersKeepalive)
return ok()
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
var report: HealthReport
report.nodeHealth = hm.nodeHealth
if not hm.node.isNil():
let relayHealth = hm.getRelayHealth()
report.protocolsHealth.add(relayHealth)
report.protocolsHealth.add(await hm.getRlnRelayHealth())
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreHealth())
report.protocolsHealth.add(hm.getLegacyStoreHealth())
report.protocolsHealth.add(hm.getPeerExchangeHealth())
report.protocolsHealth.add(hm.getRendezvousHealth())
report.protocolsHealth.add(hm.getMixHealth())
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreClientHealth())
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
return report
proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) =
hm.node = node
proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) =
hm.nodeHealth = health
proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
hm.onlineMonitor.startOnlineMonitor()
if isNil(hm.node.peerManager):
return err("startHealthMonitor: no node peerManager to monitor")
if not isNil(hm.node.wakuRelay):
hm.relayObserver = PubSubObserver(
onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} =
hm.onRelayMsg(peer, msgs)
)
hm.node.wakuRelay.addObserver(hm.relayObserver)
hm.peerEventListener = EventWakuPeer.listen(
hm.node.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
## Recompute health on any peer changing anything (join, leave, identify, metadata update)
hm.healthUpdateEvent.fire(),
).valueOr:
return err("Failed to subscribe to peer events: " & error)
hm.healthUpdateEvent = newAsyncEvent()
hm.healthUpdateEvent.fire()
hm.healthLoopFut = hm.healthLoop()
hm.startKeepalive().isOkOr:
return err("startHealthMonitor: failed starting keep alive: " & error)
return ok()
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
if not hm.onlineMonitor.isNil():
if not isNil(hm.onlineMonitor):
await hm.onlineMonitor.stopOnlineMonitor()
if not hm.keepAliveFut.isNil():
if not isNil(hm.keepAliveFut):
await hm.keepAliveFut.cancelAndWait()
if not isNil(hm.healthLoopFut):
await hm.healthLoopFut.cancelAndWait()
if hm.peerEventListener.id != 0:
EventWakuPeer.dropListener(hm.node.brokerCtx, hm.peerEventListener)
if not isNil(hm.node.wakuRelay) and not isNil(hm.relayObserver):
hm.node.wakuRelay.removeObserver(hm.relayObserver)
proc new*(
T: type NodeHealthMonitor,
node: WakuNode,
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
): T =
let om = OnlineMonitor.init(dnsNameServers)
om.setPeerStoreToOnlineMonitor(node.switch.peerStore)
om.addOnlineStateObserver(node.peerManager.getOnlineStateObserver())
T(
nodeHealth: INITIALIZING,
node: nil,
onlineMonitor: OnlineMonitor.init(dnsNameServers),
node: node,
onlineMonitor: om,
connectionStatus: ConnectionStatus.Disconnected,
strength: initTable[WakuProtocol, int](),
)

View File

@ -1,5 +1,8 @@
import std/[options, strformat]
import ./health_status
import waku/common/waku_protocol
export waku_protocol
type ProtocolHealth* = object
protocol*: string
@ -39,8 +42,7 @@ proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth =
proc `$`*(p: ProtocolHealth): string =
return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}"
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
let p = ProtocolHealth(
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
proc init*(p: typedesc[ProtocolHealth], protocol: WakuProtocol): ProtocolHealth =
return ProtocolHealth(
protocol: $protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
)
return p

View File

@ -1,27 +1,31 @@
{.push raises: [].}
import
std/[options, sets, sequtils, times, strformat, strutils, math, random, tables],
std/
[
options, sets, sequtils, times, strformat, strutils, math, random, tables,
algorithm,
],
chronos,
chronicles,
metrics,
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver,
libp2p/peerstore
import
../../common/nimchronos,
../../common/enr,
../../common/callbacks,
../../common/utils/parse_size_units,
../../waku_core,
../../waku_relay,
../../waku_relay/protocol,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
../health_monitor/online_monitor,
libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore],
waku/[
waku_core,
waku_relay,
waku_metadata,
waku_core/topics/sharding,
waku_relay/protocol,
waku_enr/sharding,
waku_enr/capabilities,
events/peer_events,
common/nimchronos,
common/enr,
common/callbacks,
common/utils/parse_size_units,
common/broker/broker_context,
node/health_monitor/online_monitor,
],
./peer_store/peer_storage,
./waku_peer_store
@ -84,6 +88,7 @@ type ConnectionChangeHandler* = proc(
): Future[void] {.gcsafe, raises: [Defect].}
type PeerManager* = ref object of RootObj
brokerCtx: BrokerContext
switch*: Switch
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
@ -222,7 +227,19 @@ proc selectPeer*(
protocol = proto, peers, address = cast[uint](pm.switch.peerStore)
if shard.isSome():
peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get())))
# Parse the shard from the pubsub topic to get cluster and shard ID
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
return none(RemotePeerInfo)
# Filter peers that support the requested shard
# Check both ENR (if present) and the shards field on RemotePeerInfo
peers.keepItIf(
# Check ENR if available
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
# Otherwise check the shards field directly
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)
shuffle(peers)
@ -483,8 +500,9 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
proc connectedPeers*(
pm: PeerManager, protocol: string = ""
): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## If a protocol is specified, only returns peers with at least one stream of that protocol
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that currently have one
## or more active logical streams for that protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
@ -500,6 +518,65 @@ proc connectedPeers*(
return (inPeers, outPeers)
proc capablePeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that have identified
## themselves as supporting the protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
for peerId, muxers in pm.switch.connManager.getConnections():
# filter out peers that don't have the capability registered in the peer store
if pm.switch.peerStore.hasPeer(peerId, protocol):
for peerConn in muxers:
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
proc getConnectedPeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## with active streams for a specific protocol.
let (inPeers, outPeers) = pm.connectedPeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getCapablePeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## who have identified themselves as supporting the given protocol.
let (inPeers, outPeers) = pm.capablePeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getPeersForShard*(pm: PeerManager, protocolId: string, shard: PubsubTopic): int =
let (inPeers, outPeers) = pm.connectedPeers(protocolId)
let connectedProtocolPeers = inPeers & outPeers
if connectedProtocolPeers.len == 0:
return 0
let shardInfo = RelayShard.parse(shard).valueOr:
# count raw peers of the given protocol if for some reason we can't get
# a shard mapping out of the gossipsub topic string.
return connectedProtocolPeers.len
var shardPeers = 0
for peerId in connectedProtocolPeers:
if pm.switch.peerStore.hasShard(peerId, shardInfo.clusterId, shardInfo.shardId):
shardPeers.inc()
return shardPeers
proc disconnectAllPeers*(pm: PeerManager) {.async.} =
let (inPeerIds, outPeerIds) = pm.connectedPeers()
let connectedPeers = concat(inPeerIds, outPeerIds)
@ -635,7 +712,7 @@ proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
# Event Handling #
#~~~~~~~~~~~~~~~~~#
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let res = catch:
await pm.switch.dial(peerId, WakuMetadataCodec)
@ -664,6 +741,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let shards = metadata.shards.mapIt(it.uint16)
pm.switch.peerStore.setShardInfo(peerId, shards)
# TODO: should only trigger an event if metadata actually changed
# should include the shard subscription delta in the event when
# it is a MetadataUpdated event
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated)
return
info "disconnecting from peer", peerId = peerId, reason = reason
@ -673,14 +754,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.onPeerMetadata(peerId)
await pm.refreshPeerMetadata(peerId)
var peerStore = pm.switch.peerStore
var direction: PeerDirection
var connectedness: Connectedness
case event.kind
of Joined:
of PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
@ -708,10 +789,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
peerStore.delete(peerId)
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Joined)
of Left:
of PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect
@ -723,12 +806,16 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
pm.ipTable.del(ip)
break
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Left)
of Identified:
of PeerEventKind.Identified:
info "event identified", peerId = peerId
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified)
peerStore[ConnectionBook][peerId] = connectedness
peerStore[DirectionBook][peerId] = direction
@ -1085,8 +1172,11 @@ proc new*(
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let brokerCtx = globalBrokerContext()
let pm = PeerManager(
switch: switch,
brokerCtx: brokerCtx,
wakuMetadata: wakuMetadata,
storage: storage,
initialBackoffInSec: initialBackoffInSec,

View File

@ -162,7 +162,9 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerId): Connectedness =
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)
return
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) or
peerStore[ShardBook].book.getOrDefault(peerId, @[]).contains(shard)
proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)
@ -219,7 +221,8 @@ proc getPeersByShard*(
peerStore: PeerStore, cluster, shard: uint16
): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(
it.enr.isSome() and it.enr.get().containsShard(cluster, shard)
(it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) or
it.shards.contains(shard)
)
proc getPeersByCapability*(

View File

@ -42,6 +42,7 @@ import
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
@ -57,12 +58,18 @@ import
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
common/broker/broker_context,
common/broker/request_broker,
waku_mix,
requests/node_requests,
common/broker/broker_context,
requests/health_requests,
events/health_events,
events/peer_events,
],
./net_config,
./peer_manager
./peer_manager,
./health_monitor/health_status,
./health_monitor/topic_health
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
@ -91,6 +98,9 @@ const clientId* = "Nimbus Waku v2 node"
const WakuNodeVersionString* = "version / git commit hash: " & git_version
const EdgeTopicHealthyThreshold = 2
## Lightpush server and filter server requirement for a healthy topic in edge mode
# key and crypto modules different
type
# TODO: Move to application instance (e.g., `WakuNode2`)
@ -135,6 +145,10 @@ type
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent
edgeHealthLoop: Future[void]
peerEventListener*: EventWakuPeerListener
proc deduceRelayShard(
node: WakuNode,
@ -153,20 +167,28 @@ proc deduceRelayShard(
return err("Invalid topic:" & pubsubTopic & " " & $error)
return ok(shard)
proc getShardsGetter(node: WakuNode): GetShards =
proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
# fetch pubsubTopics subscribed to relay and convert them to shards
if node.wakuRelay.isNil():
return @[]
# If relay is not mounted, return configured shards
return configuredShards
let subscribedTopics = node.wakuRelay.subscribedTopics()
# If relay hasn't subscribed to any topics yet, return configured shards
if subscribedTopics.len == 0:
return configuredShards
let relayShards = topicsToRelayShards(subscribedTopics).valueOr:
error "could not convert relay topics to shards",
error = $error, topics = subscribedTopics
return @[]
# Fall back to configured shards on error
return configuredShards
if relayShards.isSome():
let shards = relayShards.get().shardIds
return shards
return @[]
return configuredShards
proc getCapabilitiesGetter(node: WakuNode): GetCapabilities =
return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} =
@ -213,7 +235,7 @@ proc new*(
rateLimitSettings: rateLimitSettings,
)
peerManager.setShardGetter(node.getShardsGetter())
peerManager.setShardGetter(node.getShardsGetter(@[]))
return node
@ -258,7 +280,7 @@ proc mountMetadata*(
if not node.wakuMetadata.isNil():
return err("Waku metadata already mounted, skipping")
let metadata = WakuMetadata.new(clusterId, node.getShardsGetter())
let metadata = WakuMetadata.new(clusterId, node.getShardsGetter(shards))
node.wakuMetadata = metadata
node.peerManager.wakuMetadata = metadata
@ -399,14 +421,18 @@ proc mountRendezvousClient*(node: WakuNode, clusterId: uint16) {.async: (raises:
if node.started:
await node.wakuRendezvousClient.start()
proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} =
proc mountRendezvous*(
node: WakuNode, clusterId: uint16, shards: seq[RelayShard] = @[]
) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"
let configuredShards = shards.mapIt(it.shardId)
node.wakuRendezvous = WakuRendezVous.new(
node.switch,
node.peerManager,
clusterId,
node.getShardsGetter(),
node.getShardsGetter(configuredShards),
node.getCapabilitiesGetter(),
node.getWakuPeerRecordGetter(),
).valueOr:
@ -469,7 +495,52 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
return ok()
proc startProvidersAndListeners(node: WakuNode) =
proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth =
let filterPeers =
node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard)
let lightpushPeers =
node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard)
if filterPeers >= EdgeTopicHealthyThreshold and
lightpushPeers >= EdgeTopicHealthyThreshold:
return TopicHealth.SUFFICIENTLY_HEALTHY
elif filterPeers > 0 and lightpushPeers > 0:
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.UNHEALTHY
proc loopEdgeHealth(node: WakuNode) {.async.} =
while node.started:
await node.edgeHealthEvent.wait()
node.edgeHealthEvent.clear()
try:
for shard in node.edgeTopicsHealth.keys:
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
continue
let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY)
let newHealth = node.calculateEdgeTopicHealth(shard)
if newHealth != oldHealth:
node.edgeTopicsHealth[shard] = newHealth
EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth)
except CancelledError:
break
except CatchableError as e:
warn "Error in edge health check", error = e.msg
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
proc startProvidersAndListeners*(node: WakuNode) =
node.peerEventListener = EventWakuPeer.listen(
node.brokerCtx,
proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} =
node.edgeHealthEvent.fire(),
).valueOr:
error "Failed to listen to peer events", error = error
return
RequestRelayShard.setProvider(
node.brokerCtx,
proc(
@ -481,8 +552,60 @@ proc startProvidersAndListeners(node: WakuNode) =
).isOkOr:
error "Can't set provider for RequestRelayShard", error = error
proc stopProvidersAndListeners(node: WakuNode) =
RequestShardTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string] =
var response: RequestShardTopicsHealth
for shard in topics:
var healthStatus = TopicHealth.UNHEALTHY
if not node.wakuRelay.isNil:
healthStatus =
node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)
if healthStatus == TopicHealth.NOT_SUBSCRIBED:
healthStatus = node.calculateEdgeTopicHealth(shard)
response.topicHealth.add((shard, healthStatus))
return ok(response),
).isOkOr:
error "Can't set provider for RequestShardTopicsHealth", error = error
RequestContentTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string] =
var response: RequestContentTopicsHealth
for contentTopic in topics:
var topicHealth = TopicHealth.NOT_SUBSCRIBED
let shardResult = node.deduceRelayShard(contentTopic, none[PubsubTopic]())
if shardResult.isOk():
let shardObj = shardResult.get()
let pubsubTopic = $shardObj
if not isNil(node.wakuRelay):
topicHealth = node.wakuRelay.topicsHealth.getOrDefault(
pubsubTopic, TopicHealth.NOT_SUBSCRIBED
)
if topicHealth == TopicHealth.NOT_SUBSCRIBED and
pubsubTopic in node.edgeTopicsHealth:
topicHealth = node.calculateEdgeTopicHealth(pubsubTopic)
response.contentTopicHealth.add((topic: contentTopic, health: topicHealth))
return ok(response),
).isOkOr:
error "Can't set provider for RequestContentTopicsHealth", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
EventWakuPeer.dropListener(node.brokerCtx, node.peerEventListener)
RequestRelayShard.clearProvider(node.brokerCtx)
RequestContentTopicsHealth.clearProvider(node.brokerCtx)
RequestShardTopicsHealth.clearProvider(node.brokerCtx)
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
@ -532,6 +655,9 @@ proc start*(node: WakuNode) {.async.} =
## The switch will update addresses after start using the addressMapper
await node.switch.start()
node.edgeHealthEvent = newAsyncEvent()
node.edgeHealthLoop = loopEdgeHealth(node)
node.startProvidersAndListeners()
node.started = true
@ -549,6 +675,10 @@ proc stop*(node: WakuNode) {.async.} =
node.stopProvidersAndListeners()
if not node.edgeHealthLoop.isNil:
await node.edgeHealthLoop.cancelAndWait()
node.edgeHealthLoop = nil
await node.switch.stop()
node.peerManager.stop()

View File

@ -1,21 +0,0 @@
import waku/common/broker/[request_broker, multi_request_broker]
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health]
import waku/waku_core/topics
export protocol_health, topic_health
RequestBroker(sync):
type RequestNodeHealth* = object
healthStatus*: NodeHealth
RequestBroker(sync):
type RequestRelayTopicsHealth* = object
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string]
MultiRequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth

View File

@ -0,0 +1,39 @@
import waku/common/broker/request_broker
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health, health_report]
import waku/waku_core/topics
import waku/common/waku_protocol
export protocol_health, topic_health
# Get the overall node connectivity status
RequestBroker(sync):
type RequestConnectionStatus* = object
connectionStatus*: ConnectionStatus
# Get the health status of a set of content topics
RequestBroker(sync):
type RequestContentTopicsHealth* = object
contentTopicHealth*: seq[tuple[topic: ContentTopic, health: TopicHealth]]
proc signature(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string]
# Get a consolidated node health report
RequestBroker:
type RequestHealthReport* = object
healthReport*: HealthReport
# Get the health status of a set of shards (pubsub topics)
RequestBroker(sync):
type RequestShardTopicsHealth* = object
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
proc signature(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string]
# Get the health status of a mounted protocol
RequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth
proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]]

View File

@ -1,3 +1,3 @@
import ./[health_request, rln_requests, node_requests]
import ./[health_requests, rln_requests, node_requests]
export health_request, rln_requests, node_requests
export health_requests, rln_requests, node_requests

View File

@ -2,7 +2,8 @@
import results
import chronicles, json_serialization, json_serialization/std/options
import ../../../waku_node, ../serdes
import ../serdes
import waku/[waku_node, api/types]
#### Serialization and deserialization
@ -44,6 +45,7 @@ proc writeValue*(
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("nodeHealth", $value.nodeHealth)
writer.writeField("connectionStatus", $value.connectionStatus)
writer.writeField("protocolsHealth", value.protocolsHealth)
writer.endRecord()
@ -52,6 +54,7 @@ proc readValue*(
) {.raises: [SerializationError, IOError].} =
var
nodeHealth: Option[HealthStatus]
connectionStatus: Option[ConnectionStatus]
protocolsHealth: Option[seq[ProtocolHealth]]
for fieldName in readObjectFields(reader):
@ -66,6 +69,16 @@ proc readValue*(
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
nodeHealth = some(health)
of "connectionStatus":
if connectionStatus.isSome():
reader.raiseUnexpectedField(
"Multiple `connectionStatus` fields found", "HealthReport"
)
let state = ConnectionStatus.init(reader.readValue(string)).valueOr:
reader.raiseUnexpectedValue("Invalid `connectionStatus` value: " & $error)
connectionStatus = some(state)
of "protocolsHealth":
if protocolsHealth.isSome():
reader.raiseUnexpectedField(
@ -79,5 +92,8 @@ proc readValue*(
if nodeHealth.isNone():
reader.raiseUnexpectedValue("Field `nodeHealth` is missing")
value =
HealthReport(nodeHealth: nodeHealth.get, protocolsHealth: protocolsHealth.get(@[]))
value = HealthReport(
nodeHealth: nodeHealth.get,
connectionStatus: connectionStatus.get,
protocolsHealth: protocolsHealth.get(@[]),
)

View File

@ -7,4 +7,4 @@ import bearssl/rand, stew/byteutils
proc generateRequestId*(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
return byteutils.toHex(bytes)

View File

@ -297,13 +297,13 @@ method put*(
pubsubTopic: PubsubTopic,
message: WakuMessage,
): Future[ArchiveDriverResult[void]] {.async.} =
let messageHash = toHex(messageHash)
let messageHash = byteutils.toHex(messageHash)
let contentTopic = message.contentTopic
let payload = toHex(message.payload)
let payload = byteutils.toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
let meta = toHex(message.meta)
let meta = byteutils.toHex(message.meta)
trace "put PostgresDriver",
messageHash, contentTopic, payload, version, timestamp, meta
@ -439,7 +439,7 @@ proc getMessagesArbitraryQuery(
var args: seq[string]
if cursor.isSome():
let hashHex = toHex(cursor.get())
let hashHex = byteutils.toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
@ -520,7 +520,7 @@ proc getMessageHashesArbitraryQuery(
var args: seq[string]
if cursor.isSome():
let hashHex = toHex(cursor.get())
let hashHex = byteutils.toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
@ -630,7 +630,7 @@ proc getMessagesPreparedStmt(
return ok(rows)
let hashHex = toHex(cursor.get())
let hashHex = byteutils.toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
@ -723,7 +723,7 @@ proc getMessageHashesPreparedStmt(
return ok(rows)
let hashHex = toHex(cursor.get())
let hashHex = byteutils.toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)

View File

@ -213,13 +213,13 @@ method put*(
messageHash: WakuMessageHash,
receivedTime: Timestamp,
): Future[ArchiveDriverResult[void]] {.async.} =
let digest = toHex(digest.data)
let messageHash = toHex(messageHash)
let digest = byteutils.toHex(digest.data)
let messageHash = byteutils.toHex(messageHash)
let contentTopic = message.contentTopic
let payload = toHex(message.payload)
let payload = byteutils.toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
let meta = toHex(message.meta)
let meta = byteutils.toHex(message.meta)
trace "put PostgresDriver", timestamp = timestamp
@ -312,7 +312,7 @@ proc getMessagesArbitraryQuery(
args.add(pubsubTopic.get())
if cursor.isSome():
let hashHex = toHex(cursor.get().hash)
let hashHex = byteutils.toHex(cursor.get().hash)
var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc entreeCallback(pqResult: ptr PGresult) =
@ -463,7 +463,7 @@ proc getMessagesPreparedStmt(
let limit = $maxPageSize
if cursor.isSome():
let hash = toHex(cursor.get().hash)
let hash = byteutils.toHex(cursor.get().hash)
var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
@ -576,7 +576,7 @@ proc getMessagesV2PreparedStmt(
var stmtDef =
if ascOrder: SelectWithCursorV2AscStmtDef else: SelectWithCursorV2DescStmtDef
let digest = toHex(cursor.get().digest.data)
let digest = byteutils.toHex(cursor.get().digest.data)
let timestamp = $cursor.get().storeTime
(

View File

@ -29,7 +29,7 @@ type WakuFilterClient* = ref object of LPProtocol
func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
return byteutils.toHex(bytes)
proc sendSubscribeRequest(
wfc: WakuFilterClient,

View File

@ -5,7 +5,7 @@
{.push raises: [].}
import
std/[strformat, strutils],
std/[strformat, strutils, sets],
stew/byteutils,
results,
sequtils,
@ -21,11 +21,13 @@ import
import
waku/waku_core,
waku/node/health_monitor/topic_health,
waku/requests/health_request,
waku/requests/health_requests,
waku/events/health_events,
./message_id,
waku/common/broker/broker_context
waku/common/broker/broker_context,
waku/events/peer_events
from ../waku_core/codecs import WakuRelayCodec
from waku/waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
type ShardMetrics = object
@ -154,6 +156,8 @@ type
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
brokerCtx: BrokerContext
peerEventListener: EventWakuPeerListener
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
@ -165,6 +169,11 @@ type
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]
topicHealthUpdateEvent: AsyncEvent
topicHealthDirty: HashSet[string]
# list of topics that need their health updated in the update event
topicHealthCheckAll: bool
# true if all topics need to have their health status refreshed in the update event
msgMetricsPerShard*: Table[string, ShardMetrics]
# predefinition for more detailed results from publishing new message
@ -287,6 +296,21 @@ proc initRelayObservers(w: WakuRelay) =
)
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome():
let ctrl = msgs.control.get()
var topicsChanged = false
for graft in ctrl.graft:
w.topicHealthDirty.incl(graft.topicID)
topicsChanged = true
for prune in ctrl.prune:
w.topicHealthDirty.incl(prune.topicID)
topicsChanged = true
if topicsChanged:
w.topicHealthUpdateEvent.fire()
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
@ -325,18 +349,6 @@ proc initRelayObservers(w: WakuRelay) =
w.addObserver(administrativeObserver)
proc initRequestProviders(w: WakuRelay) =
RequestRelayTopicsHealth.setProvider(
globalBrokerContext(),
proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] =
var collectedRes: RequestRelayTopicsHealth
for topic in topics:
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
collectedRes.topicHealth.add((topic, health))
return ok(collectedRes),
).isOkOr:
error "Cannot set Relay Topics Health request provider", error = error
proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
@ -354,12 +366,25 @@ proc new*(
maxMessageSize = maxMessageSize,
parameters = GossipsubParameters,
)
w.brokerCtx = globalBrokerContext()
procCall GossipSub(w).initPubSub()
w.topicsHealth = initTable[string, TopicHealth]()
w.topicHealthUpdateEvent = newAsyncEvent()
w.topicHealthDirty = initHashSet[string]()
w.topicHealthCheckAll = false
w.initProtocolHandler()
w.initRelayObservers()
w.initRequestProviders()
w.peerEventListener = EventWakuPeer.listen(
w.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected:
w.topicHealthCheckAll = true
w.topicHealthUpdateEvent.fire()
,
).valueOr:
return err("Failed to subscribe to peer events: " & error)
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
@ -437,38 +462,58 @@ proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.SUFFICIENTLY_HEALTHY
proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} =
var futs = newSeq[Future[void]]()
for topic in toSeq(wakuRelay.topics.keys):
## loop over all the topics I'm subscribed to
let
oldHealth = wakuRelay.topicsHealth.getOrDefault(topic)
currentHealth = wakuRelay.calculateTopicHealth(topic)
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
if oldHealth == currentHealth:
continue
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
wakuRelay.topicsHealth[topic] = currentHealth
if not wakuRelay.onTopicHealthChange.isNil():
let fut = wakuRelay.onTopicHealthChange(topic, currentHealth)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)
proc topicsHealthLoop(w: WakuRelay) {.async.} =
while true:
await w.topicHealthUpdateEvent.wait()
w.topicHealthUpdateEvent.clear()
var topicsToCheck: seq[string]
if w.topicHealthCheckAll:
topicsToCheck = toSeq(w.topics.keys)
else:
topicsToCheck = toSeq(w.topicHealthDirty)
w.topicHealthCheckAll = false
w.topicHealthDirty.clear()
var futs = newSeq[Future[void]]()
for topic in topicsToCheck:
# guard against topic being unsubscribed since fire()
if not w.isSubscribed(topic):
continue
let
oldHealth = w.topicsHealth.getOrDefault(topic, TopicHealth.UNHEALTHY)
currentHealth = w.calculateTopicHealth(topic)
if oldHealth == currentHealth:
continue
w.topicsHealth[topic] = currentHealth
EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth)
if not w.onTopicHealthChange.isNil():
futs.add(w.onTopicHealthChange(topic, currentHealth))
if futs.len() > 0:
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
discard await allFinished(futs)
except CancelledError:
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.readError()
warn "Error in health change handler", description = err.msg
break
except CatchableError as e:
warn "Error in topic health callback", error = e.msg
proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} =
while true:
await wakuRelay.updateTopicsHealth()
await sleepAsync(10.seconds)
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
method start*(w: WakuRelay) {.async, base.} =
info "start"
@ -478,15 +523,13 @@ method start*(w: WakuRelay) {.async, base.} =
method stop*(w: WakuRelay) {.async, base.} =
info "stop"
await procCall GossipSub(w).stop()
if w.peerEventListener.id != 0:
EventWakuPeer.dropListener(w.brokerCtx, w.peerEventListener)
if not w.topicHealthLoopHandle.isNil():
await w.topicHealthLoopHandle.cancelAndWait()
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} =
# rejects messages that are not WakuMessage
let wrappedValidator = proc(
@ -584,7 +627,8 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
procCall GossipSub(w).subscribe(pubsubTopic, topicHandler)
w.topicHandlers[pubsubTopic] = topicHandler
asyncSpawn w.updateTopicsHealth()
w.topicHealthDirty.incl(pubsubTopic)
w.topicHealthUpdateEvent.fire()
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
@ -594,6 +638,8 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
w.topicsHealth.del(pubsubTopic)
w.topicHealthDirty.excl(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
if not w.topicValidator.hasKey(pubsubTopic):
@ -619,6 +665,8 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
w.topicsHealth.del(pubsubTopic)
w.topicHealthDirty.excl(pubsubTopic)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage

View File

@ -346,7 +346,7 @@ proc generateRlnValidator*(
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(message)
let
proof = toHex(msgProof.proof)
proof = byteutils.toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)

View File

@ -79,7 +79,8 @@ proc messageIngress*(
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
error "failed to insert new message",
msg_hash = byteutils.toHex(id.hash), error = $error
proc messageIngress*(
self: SyncReconciliation,
@ -87,7 +88,7 @@ proc messageIngress*(
pubsubTopic: PubsubTopic,
msg: WakuMessage,
) =
trace "message ingress", msg_hash = msgHash.toHex(), msg = msg
trace "message ingress", msg_hash = byteutils.toHex(msgHash), msg = msg
if msg.ephemeral:
return
@ -95,7 +96,8 @@ proc messageIngress*(
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
error "failed to insert new message",
msg_hash = byteutils.toHex(id.hash), error = $error
proc messageIngress*(
self: SyncReconciliation,
@ -104,7 +106,8 @@ proc messageIngress*(
contentTopic: ContentTopic,
) =
self.storage.insert(id, pubsubTopic, contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
error "failed to insert new message",
msg_hash = byteutils.toHex(id.hash), error = $error
proc preProcessPayload(
self: SyncReconciliation, payload: RangesData