resolve conflicts

This commit is contained in:
stubbsta 2025-04-28 10:04:08 +02:00
commit 19c3ea432d
22 changed files with 281 additions and 209 deletions

View File

@ -40,8 +40,8 @@ ifeq ($(detected_OS),Windows)
NIM_PARAMS += --passL:"-L$(MINGW_PATH)/lib"
NIM_PARAMS += --passL:"-Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc"
NIM_PARAMS += --passL:"-Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream"
LIBS = -static -lws2_32 -lbcrypt -liphlpapi -luserenv -lntdll -lminiupnpc -lnatpmp -lpq
LIBS = -static -lws2_32 -lbcrypt -liphlpapi -luserenv -lntdll -lminiupnpc -lnatpmp -lpq
NIM_PARAMS += $(foreach lib,$(LIBS),--passL:"$(lib)")
endif
@ -83,7 +83,7 @@ HEAPTRACKER_INJECT ?= 0
ifeq ($(HEAPTRACKER), 1)
# Needed to make nimbus-build-system use the Nim's 'heaptrack_support' branch
DOCKER_NIM_COMMIT := NIM_COMMIT=heaptrack_support
TARGET := prod-with-heaptrack
TARGET := heaptrack-build
ifeq ($(HEAPTRACKER_INJECT), 1)
# the Nim compiler will load 'libheaptrack_inject.so'
@ -340,6 +340,17 @@ docker-image:
--target $(TARGET) \
--tag $(DOCKER_IMAGE_NAME) .
docker-quick-image: MAKE_TARGET ?= wakunode2
docker-quick-image: DOCKER_IMAGE_TAG ?= $(MAKE_TARGET)-$(GIT_VERSION)
docker-quick-image: DOCKER_IMAGE_NAME ?= wakuorg/nwaku:$(DOCKER_IMAGE_TAG)
docker-quick-image: NIM_PARAMS := $(NIM_PARAMS) -d:chronicles_colors:none -d:insecure -d:postgres --passL:$(LIBRLN_FILE) --passL:-lm
docker-quick-image: | build deps librln wakunode2
docker build \
--build-arg="MAKE_TARGET=$(MAKE_TARGET)" \
--tag $(DOCKER_IMAGE_NAME) \
--file docker/binaries/Dockerfile.bn.amd64 \
.
docker-push:
docker push $(DOCKER_IMAGE_NAME)
@ -367,6 +378,14 @@ docker-liteprotocoltester:
--file apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile \
.
docker-quick-liteprotocoltester: DOCKER_LPT_TAG ?= latest
docker-quick-liteprotocoltester: DOCKER_LPT_NAME ?= wakuorg/liteprotocoltester:$(DOCKER_LPT_TAG)
docker-quick-liteprotocoltester: | liteprotocoltester
docker build \
--tag $(DOCKER_LPT_NAME) \
--file apps/liteprotocoltester/Dockerfile.liteprotocoltester \
.
docker-liteprotocoltester-push:
docker push $(DOCKER_LPT_NAME)
@ -497,5 +516,4 @@ release-notes:
release-notes |\
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101
# asked here: https://github.com/bvieira/sv4git/discussions/101

View File

@ -78,7 +78,7 @@ pipeline {
"--build-arg=NIMFLAGS='${params.NIMFLAGS} -d:postgres ' " +
"--build-arg=LOG_LEVEL='${params.LOWEST_LOG_LEVEL_ALLOWED}' " +
"--build-arg=DEBUG='${params.DEBUG ? "1" : "0"} ' " +
"--target=${params.HEAPTRACK ? "prod-with-heaptrack" : "prod"} ."
"--target=${params.HEAPTRACK ? "heaptrack-build" : "prod"} ."
)
} }
}

View File

@ -722,13 +722,13 @@ suite "Waku rln relay":
# validate messages
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
let
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time))
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1)
# wm2 is published within the same Epoch as wm1 and should be found as spam
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time))
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2)
# a valid message should be validated successfully
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time))
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3)
# wm4 has no rln proof and should not be validated
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time))
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4)
check:
msgValidate1 == MessageValidationResult.Valid
@ -778,9 +778,9 @@ suite "Waku rln relay":
# validate messages
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
let
msgValidate1 = wakuRlnRelay1.validateMessageAndUpdateLog(wm1, some(time))
msgValidate1 = wakuRlnRelay1.validateMessageAndUpdateLog(wm1)
# since this message is from a different sender, it should be validated successfully
msgValidate2 = wakuRlnRelay1.validateMessageAndUpdateLog(wm2, some(time))
msgValidate2 = wakuRlnRelay1.validateMessageAndUpdateLog(wm2)
check:
msgValidate1 == MessageValidationResult.Valid

View File

@ -487,13 +487,6 @@ procSuite "WakuNode - RLN relay":
await node3.stop()
xasyncTest "clearNullifierLog: should clear epochs > MaxEpochGap":
<<<<<<< HEAD
=======
## This is skipped because is flaky and made CI randomly fail but is useful to run manually
<<<<<<< HEAD
>>>>>>> deprecate_sync_strategy
=======
>>>>>>> deprecate_sync_strategy
# Given two nodes
let
contentTopic = ContentTopic("/waku/2/default-content/proto")

View File

@ -1,4 +1,4 @@
import std/[options, random], chronos
import std/[options, random], chronos, chronicles
import
waku/[
@ -23,7 +23,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
wantsTx: AsyncQueue[PeerId],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
cluster: uint16 = 1,
shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7],
@ -51,7 +51,7 @@ proc newTestWakuRecon*(
proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
wantsRx: AsyncQueue[PeerId],
needsRx: AsyncQueue[(PeerId, Fingerprint)],
): SyncTransfer =
let peerManager = PeerManager.new(switch)

View File

@ -27,7 +27,7 @@ suite "Waku Sync: reconciliation":
var
idsChannel {.threadvar.}: AsyncQueue[SyncID]
localWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
localWants {.threadvar.}: AsyncQueue[PeerId]
remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
var server {.threadvar.}: SyncReconciliation
@ -43,7 +43,7 @@ suite "Waku Sync: reconciliation":
await allFutures(serverSwitch.start(), clientSwitch.start())
idsChannel = newAsyncQueue[SyncID]()
localWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
localWants = newAsyncQueue[PeerId]()
remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
@ -61,7 +61,6 @@ suite "Waku Sync: reconciliation":
asyncTest "sync 2 nodes both empty":
check:
idsChannel.len == 0
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
@ -69,7 +68,6 @@ suite "Waku Sync: reconciliation":
check:
idsChannel.len == 0
localWants.len == 0
remoteNeeds.len == 0
asyncTest "sync 2 nodes empty client full server":
@ -141,8 +139,6 @@ suite "Waku Sync: reconciliation":
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
localWants.contains((clientPeerInfo.peerId, hash3)) == false
localWants.contains((serverPeerInfo.peerId, hash2)) == false
var syncRes = await client.storeSynchronization(some(serverPeerInfo))
assert syncRes.isOk(), $syncRes.error
@ -150,8 +146,6 @@ suite "Waku Sync: reconciliation":
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true
localWants.contains((clientPeerInfo.peerId, hash3)) == true
localWants.contains((serverPeerInfo.peerId, hash2)) == true
asyncTest "sync 2 nodes different shards":
let
@ -170,8 +164,6 @@ suite "Waku Sync: reconciliation":
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
localWants.contains((clientPeerInfo.peerId, hash3)) == false
localWants.contains((serverPeerInfo.peerId, hash2)) == false
server = await newTestWakuRecon(
serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3]
@ -185,7 +177,6 @@ suite "Waku Sync: reconciliation":
check:
remoteNeeds.len == 0
localWants.len == 0
asyncTest "sync 2 nodes same hashes":
let
@ -200,14 +191,12 @@ suite "Waku Sync: reconciliation":
client.messageIngress(hash2, msg2)
check:
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check:
localWants.len == 0
remoteNeeds.len == 0
asyncTest "sync 2 nodes 100K msgs 1 diff":
@ -236,14 +225,12 @@ suite "Waku Sync: reconciliation":
timestamp += Timestamp(part)
check:
localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == false
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check:
localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == true
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true
asyncTest "sync 2 nodes 10K msgs 1K diffs":
@ -286,7 +273,6 @@ suite "Waku Sync: reconciliation":
continue
check:
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
@ -294,7 +280,6 @@ suite "Waku Sync: reconciliation":
# timimg issue make it hard to match exact numbers
check:
localWants.len > 900
remoteNeeds.len > 900
suite "Waku Sync: transfer":
@ -310,10 +295,10 @@ suite "Waku Sync: transfer":
var
serverIds {.threadvar.}: AsyncQueue[SyncID]
serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
serverLocalWants {.threadvar.}: AsyncQueue[PeerId]
serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
clientIds {.threadvar.}: AsyncQueue[SyncID]
clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
clientLocalWants {.threadvar.}: AsyncQueue[PeerId]
clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
var
@ -341,7 +326,7 @@ suite "Waku Sync: transfer":
clientPeerManager = PeerManager.new(clientSwitch)
serverIds = newAsyncQueue[SyncID]()
serverLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
serverLocalWants = newAsyncQueue[PeerId]()
serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
server = SyncTransfer.new(
@ -353,7 +338,7 @@ suite "Waku Sync: transfer":
)
clientIds = newAsyncQueue[SyncID]()
clientLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
clientLocalWants = newAsyncQueue[PeerId]()
clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
client = SyncTransfer.new(
@ -389,8 +374,8 @@ suite "Waku Sync: transfer":
serverDriver = serverDriver.put(DefaultPubsubTopic, msgs)
# add server info and msg hash to client want channel
let want = (serverPeerInfo.peerId, hash)
# add server info to client want channel
let want = serverPeerInfo.peerId
await clientLocalWants.put(want)
# add client info and msg hash to server need channel

View File

@ -120,7 +120,9 @@ suite "Waku v2 Rest API - Admin":
check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.len() == 0
getRes.data.len() == 1
getRes.data[0].multiaddr == nonExistentPeer
getRes.data[0].connected == CannotConnect
asyncTest "Get filter data":
await allFutures(
@ -274,7 +276,7 @@ suite "Waku v2 Rest API - Admin":
check:
postRes.status == 200
let getRes = await client.getConnectedRelayPeers()
let getRes = await client.getRelayPeers()
check:
getRes.status == 200
@ -286,13 +288,13 @@ suite "Waku v2 Rest API - Admin":
# Check peer 3
# Todo: investigate why the test setup missing remote peer's shard info
# let getRes2 = await client.getConnectedRelayPeersByShard(0)
# let getRes2 = await client.getRelayPeersByShard(0)
# check:
# getRes2.status == 200
# $getRes2.contentType == $MIMETYPE_JSON
# getRes2.data.peers.len() == 2
let getRes3 = await client.getConnectedRelayPeersByShard(99)
let getRes3 = await client.getRelayPeersByShard(99)
check:
getRes3.status == 200
$getRes3.contentType == $MIMETYPE_JSON

View File

@ -196,6 +196,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
except CatchableError:
return err("failed to create switch: " & getCurrentExceptionMsg())
let netConfig = builder.netConfig.get()
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
@ -203,12 +205,13 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
dnsNameServers = netConfig.dnsNameServers,
)
var node: WakuNode
try:
node = WakuNode.new(
netConfig = builder.netConfig.get(),
netConfig = netConfig,
enr = builder.record.get(),
switch = switch,
peerManager = peerManager,

View File

@ -155,6 +155,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul
dns4DomainName = dns4DomainName,
discv5UdpPort = discv5UdpPort,
wakuFlags = some(wakuFlags),
dnsNameServers = conf.dnsAddrsNameServers,
)
return netConfigRes

View File

@ -59,6 +59,7 @@ type Waku* = ref object
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
dnsRetryLoopHandle: Future[void]
networkConnLoopHandle: Future[void]
discoveryMngr: DiscoveryManager
node*: WakuNode
@ -363,6 +364,15 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return
# The network connectivity loop checks periodically whether the node is online or not
# and triggers any change that depends on the network connectivity state
proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} =
while true:
await sleepAsync(15.seconds)
# Update online state
await waku.node.peerManager.updateOnlineState()
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
debug "Retrieve dynamic bootstrap nodes"
@ -400,6 +410,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()
# Start network connectivity check loop
waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop()
return ok()
# Waku shutdown
@ -411,6 +424,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.metricsServer.isNil():
await waku.metricsServer.stop()
if not waku.networkConnLoopHandle.isNil():
await waku.networkConnLoopHandle.cancelAndWait()
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()

View File

@ -15,6 +15,7 @@ type NetConfig* = object
extIp*: Option[IpAddress]
extPort*: Option[Port]
dns4DomainName*: Option[string]
dnsNameServers*: seq[IpAddress]
announcedAddresses*: seq[MultiAddress]
extMultiAddrs*: seq[MultiAddress]
enrMultiAddrs*: seq[MultiAddress]
@ -75,6 +76,7 @@ proc init*(
discv5UdpPort = none(Port),
clusterId: uint16 = 0,
wakuFlags = none(CapabilitiesBitfield),
dnsNameServers = newSeq[IpAddress](),
): NetConfigResult =
## Initialize and validate waku node network configuration
@ -165,6 +167,7 @@ proc init*(
extPort: extPort,
wssEnabled: wssEnabled,
dns4DomainName: dns4DomainName,
dnsNameServers: dnsNameServers,
announcedAddresses: announcedAddresses,
extMultiAddrs: extMultiAddrs,
enrMultiaddrs: enrMultiaddrs,

View File

@ -8,6 +8,7 @@ import
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver,
libp2p/nameresolving/dnsresolver,
libp2p/peerstore
import
@ -73,6 +74,8 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5
DNSCheckDomain = "one.one.one.one"
type ConnectionChangeHandler* = proc(
peerId: PeerId, peerEvent: PeerEventKind
): Future[void] {.gcsafe, raises: [Defect].}
@ -95,11 +98,16 @@ type PeerManager* = ref object of RootObj
started: bool
shardedPeerManagement: bool # temp feature flag
onConnectionChange*: ConnectionChangeHandler
dnsNameServers*: seq[IpAddress]
online: bool
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
#~~~~~~~~~~~~~~~~~~~#
template isOnline*(self: PeerManager): bool =
self.online
proc calculateBackoff(
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
): timer.Duration =
@ -535,7 +543,38 @@ proc getStreamByPeerIdAndProtocol*(
return ok(streamRes.get())
proc checkInternetConnectivity(
nameServerIps: seq[IpAddress], timeout = 2.seconds
): Future[bool] {.async.} =
var nameServers: seq[TransportAddress]
for ip in nameServerIps:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
# Resolve domain IP
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
if resolved.len > 0:
return true
else:
return false
proc updateOnlineState*(pm: PeerManager) {.async.} =
let numConnectedPeers =
pm.switch.peerStore.peers().countIt(it.connectedness == Connected)
if numConnectedPeers > 0:
pm.online = true
else:
pm.online = await checkInternetConnectivity(pm.dnsNameServers)
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
# only attempt if current node is online
if not pm.isOnline():
error "connectToRelayPeers: won't attempt new connections - node is offline"
return
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
@ -778,6 +817,10 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
if pm.wakuMetadata.shards.len == 0:
return
if not pm.isOnline():
error "manageRelayPeers: won't attempt new connections - node is offline"
return
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
var peersToDisconnect: int
@ -1005,6 +1048,7 @@ proc new*(
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
dnsNameServers = newSeq[IpAddress](),
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
@ -1055,6 +1099,8 @@ proc new*(
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
dnsNameServers: dnsNameServers,
online: true,
)
proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =

View File

@ -212,7 +212,7 @@ proc mountStoreSync*(
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](0)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
let wantsChannel = newAsyncQueue[PeerId](0)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
var cluster: uint16

View File

@ -28,6 +28,10 @@ proc getPeerById*(
rest, endpoint: "/admin/v1/peer/{peerId}", meth: HttpMethod.MethodGet
.}
proc getServicePeers*(): RestResponse[seq[WakuPeer]] {.
rest, endpoint: "/admin/v1/peers/service", meth: HttpMethod.MethodGet
.}
proc getConnectedPeers*(): RestResponse[seq[WakuPeer]] {.
rest, endpoint: "/admin/v1/peers/connected", meth: HttpMethod.MethodGet
.}
@ -38,16 +42,14 @@ proc getConnectedPeersByShard*(
rest, endpoint: "/admin/v1/peers/connected/on/{shardId}", meth: HttpMethod.MethodGet
.}
proc getConnectedRelayPeers*(): RestResponse[PeersOfShards] {.
rest, endpoint: "/admin/v1/peers/connected/relay", meth: HttpMethod.MethodGet
proc getRelayPeers*(): RestResponse[PeersOfShards] {.
rest, endpoint: "/admin/v1/peers/relay", meth: HttpMethod.MethodGet
.}
proc getConnectedRelayPeersByShard*(
proc getRelayPeersByShard*(
shardId: uint16
): RestResponse[PeersOfShard] {.
rest,
endpoint: "/admin/v1/peers/connected/relay/on/{shardId}",
meth: HttpMethod.MethodGet
rest, endpoint: "/admin/v1/peers/relay/on/{shardId}", meth: HttpMethod.MethodGet
.}
proc getMeshPeers*(): RestResponse[PeersOfShards] {.

View File

@ -34,12 +34,13 @@ logScope:
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers
const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}"
const ROUTE_ADMIN_V1_SERVICE_PEERS* = "/admin/v1/peers/service" # returns all peers
const ROUTE_ADMIN_V1_CONNECTED_PEERS* = "/admin/v1/peers/connected"
const ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD* =
"/admin/v1/peers/connected/on/{shardId}"
const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS* = "/admin/v1/peers/connected/relay"
const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD* =
"/admin/v1/peers/connected/relay/on/{shardId}"
const ROUTE_ADMIN_V1_RELAY_PEERS* = "/admin/v1/peers/relay"
const ROUTE_ADMIN_V1_RELAY_PEERS_ON_SHARD* = "/admin/v1/peers/relay/on/{shardId}"
const ROUTE_ADMIN_V1_MESH_PEERS* = "/admin/v1/peers/mesh"
const ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD* = "/admin/v1/peers/mesh/on/{shardId}"
@ -62,46 +63,48 @@ proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) =
peer.origin,
)
proc populateAdminPeerInfo(peers: var WakuPeers, node: WakuNode, codec: string) =
let peersForCodec = node.peerManager.switch.peerStore.peers(codec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: codec,
shards: it.getShards(),
connected: it.connectedness,
agent: it.agent,
origin: it.origin,
proc populateAdminPeerInfo(
peers: var WakuPeers, node: WakuNode, codec: Option[string] = none[string]()
) =
if codec.isNone():
peers = node.peerManager.switch.peerStore.peers().mapIt(WakuPeer.init(it))
else:
let peersTuples = node.peerManager.switch.peerStore.peers(codec.get()).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: codec.get(),
shards: it.getShards(),
connected: it.connectedness,
agent: it.agent,
origin: it.origin,
)
)
)
tuplesToWakuPeers(peers, peersForCodec)
tuplesToWakuPeers(peers, peersTuples)
proc populateAdminPeerInfoForAll(node: WakuNode): WakuPeers =
var peers: WakuPeers = @[]
populateAdminPeerInfo(peers, node)
return peers
proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPeers =
var peers: WakuPeers = @[]
for codec in codecs:
populateAdminPeerInfo(peers, node, codec)
populateAdminPeerInfo(peers, node, some(codec))
return peers
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
let peers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec,
WakuReconciliationCodec,
],
)
let peers = populateAdminPeerInfoForAll(node)
let resp = RestApiResponse.jsonResponse(peers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(peers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_SINGLE_PEER) do(
peerId: string
@ -115,19 +118,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
if node.peerManager.switch.peerStore.peerExists(peerIdVal):
let peerInfo = node.peerManager.switch.peerStore.getPeer(peerIdVal)
let peer = WakuPeer.init(peerInfo)
let resp = RestApiResponse.jsonResponse(peer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(peer, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
else:
return RestApiResponse.notFound(fmt("Peer with ID {peerId} not found"))
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse:
let allPeers = populateAdminPeerInfoForCodecs(
router.api(MethodGet, ROUTE_ADMIN_V1_SERVICE_PEERS) do() -> RestApiResponse:
let peers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
@ -136,16 +138,26 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
],
)
let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected)
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(peers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse:
let allPeers = populateAdminPeerInfoForAll(node)
let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected)
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {error}")
)
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD) do(
shardId: uint16
@ -153,29 +165,21 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
let allPeers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec,
WakuReconciliationCodec,
],
)
let allPeers = populateAdminPeerInfoForAll(node)
let connectedPeers = allPeers.filterIt(
it.connected == Connectedness.Connected and it.shards.contains(shard)
)
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS) do() -> RestApiResponse:
router.api(MethodGet, ROUTE_ADMIN_V1_RELAY_PEERS) do() -> RestApiResponse:
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
@ -195,16 +199,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD) do(
router.api(MethodGet, ROUTE_ADMIN_V1_RELAY_PEERS_ON_SHARD) do(
shardId: uint16
) -> RestApiResponse:
let shard = shardId.valueOr:
@ -223,14 +226,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
shard: shard, peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager))
)
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS) do() -> RestApiResponse:
if node.wakuRelay.isNil():
@ -252,14 +254,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD) do(
shardId: uint16
@ -280,14 +281,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
shard: shard, peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager))
)
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
fmt("An error occurred while building the json response: {error}")
)
return resp.get()
return resp
proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do(

View File

@ -27,9 +27,6 @@ proc inHex*(
valueHex = "0" & valueHex
return toLowerAscii(valueHex)
proc toUserMessageLimit*(v: UInt256): UserMessageLimit =
return cast[UserMessageLimit](v)
proc encodeLengthPrefix*(input: openArray[byte]): seq[byte] =
## returns length prefixed version of the input
## with the following format [len<8>|input<var>]
@ -79,7 +76,17 @@ proc serialize*(
return output
proc serialize*(witness: RLNWitnessInput): seq[byte] =
## Serializes the witness into a byte array according to the RLN protocol format
## Serializes the RLN witness into a byte array following zerokit's expected format.
## The serialized format includes:
## - identity_secret (32 bytes, little-endian with zero padding)
## - user_message_limit (32 bytes, little-endian with zero padding)
## - message_id (32 bytes, little-endian with zero padding)
## - merkle tree depth (8 bytes, little-endian) = path_elements.len / 32
## - path_elements (each 32 bytes, ordered bottom-to-top)
## - merkle tree depth again (8 bytes, little-endian)
## - identity_path_index (sequence of bits as bytes, 0 = left, 1 = right)
## - x (32 bytes, little-endian with zero padding)
## - external_nullifier (32 bytes, little-endian with zero padding)
var buffer: seq[byte]
buffer.add(@(witness.identity_secret))
buffer.add(@(witness.user_message_limit))
@ -147,4 +154,23 @@ proc fromEpoch*(epoch: Epoch): uint64 =
func `+`*(a, b: Quantity): Quantity {.borrow.}
func u256*(n: Quantity): UInt256 {.inline.} =
n.uint64.stuint(256)
n.uint64.stuint(256)
proc uint64ToField*(n: uint64): array[32, byte] =
## Converts uint64 to 32-byte little-endian array with zero padding
var bytes = toBytes(n, Endianness.littleEndian)
result[0 ..< bytes.len] = bytes
proc UInt256ToField*(v: UInt256): array[32, byte] =
return cast[array[32, byte]](v)
proc seqToField*(s: seq[byte]): array[32, byte] =
result = default(array[32, byte])
let len = min(s.len, 32)
for i in 0 ..< len:
result[i] = s[i]
proc uint64ToIndex*(index: MembershipIndex, depth: int): seq[byte] =
result = newSeq[byte](depth)
for i in 0 ..< depth:
result[i] = byte((index shr i) and 1) # LSB-first bit decomposition

View File

@ -89,25 +89,6 @@ proc setMetadata*(
return err("failed to persist rln metadata: " & getCurrentExceptionMsg())
return ok()
proc uint64ToField*(n: uint64): array[32, byte] =
## Converts uint64 to 32-byte little-endian array with zero padding
var bytes = toBytes(n, Endianness.littleEndian)
result[0 ..< bytes.len] = bytes
proc UInt256ToField*(v: UInt256): array[32, byte] =
return cast[array[32, byte]](v)
proc seqToField*(s: seq[byte]): array[32, byte] =
result = default(array[32, byte])
let len = min(s.len, 32)
for i in 0 ..< len:
result[i] = s[i]
proc uint64ToIndex(index: MembershipIndex, depth: int): seq[byte] =
result = newSeq[byte](depth)
for i in 0 ..< depth:
result[i] = byte((index shr i) and 1) # LSB-first bit decomposition
proc fetchMerkleProofElements*(
g: OnchainGroupManager
): Future[Result[seq[byte], string]] {.async.} =
@ -199,6 +180,11 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async.} =
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
g.merkleProofCache = proofResult.get()
# also need update registerd membership
let memberCount = cast[int64](await wakuRlnContract.nextFreeIndex().call())
waku_rln_number_registered_memberships.set(float64(memberCount))
await sleepAsync(rpcDelay)
method atomicBatch*(
@ -356,7 +342,6 @@ method generateProof*(
except CatchableError:
error "Failed to fetch merkle proof", error = getCurrentExceptionMsg()
if (g.merkleProofCache.len mod 32) != 0:
return err("Invalid merkle proof cache length")
@ -460,7 +445,7 @@ method verifyProof*(
if extNullRes.isErr():
return err("could not construct external nullifier: " & extNullRes.error)
normalizedProof.externalNullifier = extNullRes.get()
let proofBytes = serialize(normalizedProof, input)
let proofBuffer = proofBytes.toBuffer()
@ -476,7 +461,6 @@ method verifyProof*(
,
)
if not ffiOk:
warn "verify_with_roots() returned failure status", proof = proof
return err("could not verify the proof")
@ -585,7 +569,6 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect()
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true
return ok()

View File

@ -85,6 +85,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
var cumulativeProofsVerified = 0.float64
var cumulativeProofsGenerated = 0.float64
var cumulativeProofsRemaining = 100.float64
var cumulativeRegisteredMember = 0.float64
when defined(metrics):
logMetrics = proc() =
@ -107,6 +108,9 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
let freshProofsRemainingCount = parseAndAccumulate(
waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining
)
let freshRegisteredMemberCount = parseAndAccumulate(
waku_rln_number_registered_memberships, cumulativeRegisteredMember
)
info "Total messages", count = freshMsgCount
info "Total spam messages", count = freshSpamCount
@ -116,5 +120,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
info "Total proofs verified", count = freshProofsVerifiedCount
info "Total proofs generated", count = freshProofsGeneratedCount
info "Total proofs remaining", count = freshProofsRemainingCount
info "Total registered members", count = freshRegisteredMemberCount
return logMetrics

View File

@ -52,17 +52,19 @@ type RateLimitProof* = object
## the external nullifier used for the generation of the `proof` (derived from poseidon([epoch, rln_identifier]))
externalNullifier*: ExternalNullifier
type
Fr = array[32, byte] # Field element representation (256 bits)
type UInt40* = StUint[40]
type UInt32* = StUint[32]
type
Field = array[32, byte] # Field element representation (256 bits)
RLNWitnessInput* = object
identity_secret*: Fr
user_message_limit*: Fr
message_id*: Fr
identity_secret*: Field
user_message_limit*: Field
message_id*: Field
path_elements*: seq[byte]
identity_path_index*: seq[byte]
x*: Fr
external_nullifier*: Fr
x*: Field
external_nullifier*: Field
type ProofMetadata* = object
nullifier*: Nullifier

View File

@ -184,7 +184,7 @@ proc absDiff*(e1, e2: Epoch): uint64 =
return epoch2 - epoch1
proc validateMessage*(
rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption = none(float64)
rlnPeer: WakuRLNRelay, msg: WakuMessage
): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MaxEpochGap of the current epoch
@ -206,12 +206,8 @@ proc validateMessage*(
# checks if the `msg`'s epoch is far from the current epoch
# it corresponds to the validation of rln external nullifier
var epoch: Epoch
if timeOption.isSome():
epoch = rlnPeer.calcEpoch(timeOption.get())
else:
# get current rln epoch
epoch = rlnPeer.getCurrentEpoch()
# get current rln epoch
let epoch: Epoch = rlnPeer.getCurrentEpoch()
let
msgEpoch = proof.epoch
@ -276,12 +272,12 @@ proc validateMessage*(
return MessageValidationResult.Valid
proc validateMessageAndUpdateLog*(
rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption = none(float64)
rlnPeer: WakuRLNRelay, msg: WakuMessage
): MessageValidationResult =
## validates the message and updates the log to prevent double messaging
## in future messages
let isValidMessage = rlnPeer.validateMessage(msg, timeOption)
let isValidMessage = rlnPeer.validateMessage(msg)
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():

View File

@ -46,13 +46,10 @@ type SyncReconciliation* = ref object of LPProtocol
storage: SyncStorage
# Receive IDs from transfer protocol for storage
# AsyncQueues are used as communication channels between
# reconciliation and transfer protocols.
idsRx: AsyncQueue[SyncID]
# Send Hashes to transfer protocol for reception
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# Send Hashes to transfer protocol for transmission
localWantsTx: AsyncQueue[PeerId]
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# params
@ -100,6 +97,9 @@ proc processRequest(
roundTrips = 0
diffs = 0
# Signal to transfer protocol that this reconciliation is starting
await self.localWantsTx.addLast(conn.peerId)
while true:
let readRes = catch:
await conn.readLp(int.high)
@ -143,7 +143,6 @@ proc processRequest(
diffs.inc()
for hash in hashToRecv:
self.localWantsTx.addLastNoWait((conn.peerId, hash))
diffs.inc()
rawPayload = sendPayload.deltaEncode()
@ -168,6 +167,9 @@ proc processRequest(
continue
# Signal to transfer protocol that this reconciliation is done
await self.localWantsTx.addLast(conn.peerId)
reconciliation_roundtrips.observe(roundTrips)
reconciliation_differences.observe(diffs)
@ -296,7 +298,7 @@ proc new*(
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: timer.Duration = DefaultGossipSubJitter,
idsRx: AsyncQueue[SyncID],
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)],
localWantsTx: AsyncQueue[PeerId],
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[Result[T, string]] {.async.} =
let res = await initFillStorage(syncRange, wakuArchive)

View File

@ -37,9 +37,9 @@ type SyncTransfer* = ref object of LPProtocol
idsTx: AsyncQueue[SyncID]
# Receive Hashes from reconciliation protocol for reception
localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)]
localWantsRx: AsyncQueue[PeerId]
localWantsRxFut: Future[void]
inSessions: Table[PeerId, HashSet[WakuMessageHash]]
inSessions: HashSet[PeerId]
# Receive Hashes from reconciliation protocol for transmission
remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)]
@ -78,19 +78,14 @@ proc openConnection(
return ok(conn)
proc wantsReceiverLoop(self: SyncTransfer) {.async.} =
## Waits for message hashes,
## store the peers and hashes locally as
## "supposed to be received"
## Waits for peer ids of nodes
## we are reconciliating with
while true: # infinite loop
let (peerId, fingerprint) = await self.localWantsRx.popFirst()
let peerId = await self.localWantsRx.popFirst()
self.inSessions.withValue(peerId, value):
value[].incl(fingerprint)
do:
var hashes = initHashSet[WakuMessageHash]()
hashes.incl(fingerprint)
self.inSessions[peerId] = hashes
if self.inSessions.containsOrIncl(peerId):
self.inSessions.excl(peerId)
return
@ -137,6 +132,10 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
proc initProtocolHandler(self: SyncTransfer) =
let handler = proc(conn: Connection, proto: string) {.async, closure.} =
while true:
if not self.inSessions.contains(conn.peerId):
error "unwanted peer, disconnecting", remote = conn.peerId
break
let readRes = catch:
await conn.readLp(int64(DefaultMaxWakuMessageSize))
@ -157,16 +156,6 @@ proc initProtocolHandler(self: SyncTransfer) =
let hash = computeMessageHash(pubsub, msg)
self.inSessions.withValue(conn.peerId, value):
if value[].missingOrExcl(hash):
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
break
do:
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
break
#TODO verify msg RLN proof...
(await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr:
@ -193,7 +182,7 @@ proc new*(
peerManager: PeerManager,
wakuArchive: WakuArchive,
idsTx: AsyncQueue[SyncID],
localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)],
localWantsRx: AsyncQueue[PeerId],
remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)],
): T =
var transfer = SyncTransfer(