chore: Avoid double relay subscription (#3396)

* make sure subscribe once to every topic in waku_node
* start suggest use of removeValidator in waku_relay/protocol. Commented until libp2p updated.
This commit is contained in:
Ivan FB 2025-05-05 22:57:20 +02:00 committed by GitHub
parent 7c7ed5634f
commit 6bc05efc02
35 changed files with 596 additions and 251 deletions

View File

@ -381,7 +381,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if conf.relay:
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
await node.mountRelay(shards)
(await node.mountRelay(shards)).isOkOr:
echo "failed to mount relay: " & error
return
await node.mountLibp2pPing()
@ -535,7 +537,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
node.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), some(WakuRelayHandler(handler))
)
).isOkOr:
error "failed to subscribe to pubsub topic",
topic = DefaultPubsubTopic, error = error
if conf.rlnRelay:
info "WakuRLNRelay is enabled"

View File

@ -215,7 +215,10 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay()
(await cmb.nodev2.mountRelay()).isOkOr:
error "failed to mount relay", error = error
return
cmb.nodev2.wakuRelay.triggerSelf = false
# Bridging
@ -229,7 +232,9 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
except:
error "exception in relayHandler: " & getCurrentExceptionMsg()
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
error "failed to subscribe to relay", topic = DefaultPubsubTopic, error = error
return
proc stop*(cmb: Chat2MatterBridge) {.async: (raises: [Exception]).} =
info "Stopping Chat2MatterBridge"

View File

@ -554,7 +554,9 @@ proc subscribeAndHandleMessages(
else:
msgPerContentTopic[msg.contentTopic] = 1
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler)))
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr:
error "failed to subscribe to pubsub topic", pubsubTopic, error
quit(1)
when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
@ -619,7 +621,10 @@ when isMainModule:
let (node, discv5) = nodeRes.get()
waitFor node.mountRelay()
(waitFor node.mountRelay()).isOkOr:
error "failed to mount waku relay protocol: ", err = error
quit 1
waitFor node.mountLibp2pPing()
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =

View File

@ -86,7 +86,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
)
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
quit(1)
node.peerManager.start()
(await wakuDiscv5.start()).isOkOr:

View File

@ -84,7 +84,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
)
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
quit(1)
node.peerManager.start()
(await wakuDiscv5.start()).isOkOr:
@ -118,7 +120,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
contentTopic = msg.contentTopic,
timestamp = msg.timestamp
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler)))
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr:
error "failed to subscribe to pubsub topic", pubsubTopic, error
quit(1)
when isMainModule:
let rng = crypto.newRng()

View File

@ -187,5 +187,7 @@ proc new*(
except CatchableError:
error "could not handle SCP message: ", err = getCurrentExceptionMsg()
waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)).isOkOr:
error "could not subscribe to pubsub topic: ", err = $error
return err("could not subscribe to pubsub topic: " & $error)
return ok(SCP)

View File

@ -7,6 +7,7 @@ import
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_core/topics,
../../../../../waku/waku_relay/protocol,
../../../../../waku/node/peer_manager,
../../../../alloc
@ -108,12 +109,18 @@ proc process*(
case self.operation
of SUBSCRIBE:
# TO DO: properly perform 'subscribe'
waku.node.registerRelayDefaultHandler($self.pubsubTopic)
discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
waku.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic),
handler = some(self.relayEventCallback),
).isOkOr:
let errorMsg = "Subscribe failed:" & $error
error "SUBSCRIBE failed", error = errorMsg
return err(errorMsg)
of UNSUBSCRIBE:
# TODO: properly perform 'unsubscribe'
waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic)
waku.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic)).isOkOr:
let errorMsg = "Unsubscribe failed:" & $error
error "UNSUBSCRIBE failed", error = errorMsg
return err(errorMsg)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic

View File

@ -76,8 +76,10 @@ suite "Peer Manager":
# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()
(await client.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# And both nodes are started
await allFutures(server.start(), client.start())
@ -89,7 +91,8 @@ suite "Peer Manager":
await sleepAsync(FUTURE_TIMEOUT)
# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")).isOkOr:
assert false, "Failed to subscribe to relay"
await sleepAsync(FUTURE_TIMEOUT)
check:
@ -109,8 +112,10 @@ suite "Peer Manager":
# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()
(await client.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# And both nodes are started
await allFutures(server.start(), client.start())
@ -122,7 +127,8 @@ suite "Peer Manager":
await sleepAsync(FUTURE_TIMEOUT)
# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")).isOkOr:
assert false, "Failed to subscribe to relay"
await sleepAsync(FUTURE_TIMEOUT)
check:

View File

@ -135,7 +135,8 @@ suite "Waku Filter - End to End":
asyncTest "Client Node can't receive Push from Server Node, via Relay":
# Given the server node has Relay enabled
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "error mounting relay: " & $error
# And valid filter subscription
let subscribeResponse = await client.filterSubscribe(
@ -159,7 +160,8 @@ suite "Waku Filter - End to End":
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "error mounting relay: " & $error
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
@ -222,7 +224,8 @@ suite "Waku Filter - End to End":
pushedMsg == msg
asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay":
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "error mounting relay: " & $error
# Given a valid filter subscription
let subscribeResponse = await client.filterSubscribe(

View File

@ -52,7 +52,9 @@ suite "Waku Legacy Lightpush - End To End":
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountLegacyLightpush() # without rln-relay
client.mountLegacyLightpushClient()
@ -142,7 +144,8 @@ suite "RLN Proofs as a Lightpush Service":
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLegacyLightPush()
client.mountLegacyLightPushClient()
@ -187,8 +190,10 @@ suite "Waku Legacy Lightpush message delivery":
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
(await destNode.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
(await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
await bridgeNode.mountLegacyLightPush()
lightNode.mountLegacyLightPushClient()
@ -199,24 +204,25 @@ suite "Waku Legacy Lightpush message delivery":
await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
## Given
const CustomPubsubTopic = "/waku/2/rs/0/1"
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == DefaultPubsubTopic
topic == CustomPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic:" & $error
# Wait for subscription to take effect
await sleepAsync(100.millis)
## When
let res = await lightNode.legacyLightpushPublish(some(DefaultPubsubTopic), message)
let res = await lightNode.legacyLightpushPublish(some(CustomPubsubTopic), message)
assert res.isOk(), $res.error
## Then

View File

@ -46,7 +46,8 @@ suite "Waku Lightpush - End To End":
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountLightpush() # without rln-relay
client.mountLightpushClient()
@ -137,7 +138,8 @@ suite "RLN Proofs as a Lightpush Service":
await allFutures(server.start(), client.start())
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLightPush()
client.mountLightPushClient()
@ -182,8 +184,10 @@ suite "Waku Lightpush message delivery":
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
(await destNode.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
(await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()
@ -194,6 +198,7 @@ suite "Waku Lightpush message delivery":
await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()])
## Given
const CustomPubsubTopic = "/waku/2/rs/0/1"
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
@ -201,17 +206,18 @@ suite "Waku Lightpush message delivery":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == DefaultPubsubTopic
topic == CustomPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to relay"
# Wait for subscription to take effect
await sleepAsync(100.millis)
## When
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
let res = await lightNode.lightpushPublish(some(CustomPubsubTopic), message)
assert res.isOk(), $res.error
assert res.get() == 1, "Expected to relay the message to 1 node"

View File

@ -308,7 +308,8 @@ suite "Peer Manager":
asyncTest "Peer Protocol Support Verification (Before Connection)":
# Given the server has mounted some Waku protocols
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountFilter()
# When connecting to the server
@ -335,7 +336,8 @@ suite "Peer Manager":
server2RemotePeerInfo = server2.switch.peerInfo.toRemotePeerInfo()
server2PeerId = server2RemotePeerInfo.peerId
await server2.mountRelay()
(await server2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# When connecting to both servers
await client.connectToNodes(@[serverRemotePeerInfo, server2RemotePeerInfo])
@ -533,8 +535,10 @@ suite "Peer Manager":
suite "Peer Connectivity States":
asyncTest "State Tracking & Transition":
# Given two correctly initialised nodes, but not connected
await server.mountRelay()
await client.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await client.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# Then their connectedness should be NotConnected
check:
@ -587,8 +591,10 @@ suite "Peer Manager":
suite "Automatic Reconnection":
asyncTest "Automatic Reconnection Implementation":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await client.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await client.connectToNodes(@[serverRemotePeerInfo])
waitActive:
@ -810,7 +816,8 @@ suite "Mount Order":
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, listenIp, listenPort)
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.start()
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
@ -834,7 +841,8 @@ suite "Mount Order":
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, listenIp, listenPort)
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
serverPeerId = serverRemotePeerInfo.peerId
@ -859,7 +867,8 @@ suite "Mount Order":
server = newTestWakuNode(serverKey, listenIp, listenPort)
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
serverPeerId = serverRemotePeerInfo.peerId
@ -886,7 +895,8 @@ suite "Mount Order":
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
serverPeerId = serverRemotePeerInfo.peerId
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# When connecting to the server
await client.connectToNodes(@[serverRemotePeerInfo])
@ -910,7 +920,8 @@ suite "Mount Order":
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
serverPeerId = serverRemotePeerInfo.peerId
await server.start()
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# When connecting to the server
await client.connectToNodes(@[serverRemotePeerInfo])
@ -932,7 +943,8 @@ suite "Mount Order":
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
serverPeerId = serverRemotePeerInfo.peerId
await server.mountRelay()
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.start()
# When connecting to the server

View File

@ -263,7 +263,9 @@ suite "Waku RlnRelay - End to End - Static":
completionFut.complete((topic, msg))
let subscriptionEvent = (kind: PubsubSub, topic: pubsubTopic)
server.subscribe(subscriptionEvent, some(relayHandler))
server.subscribe(subscriptionEvent, some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic"
await sleepAsync(FUTURE_TIMEOUT)
# Generate Messages
@ -357,7 +359,9 @@ suite "Waku RlnRelay - End to End - Static":
completionFut.complete((topic, msg))
let subscriptionEvent = (kind: PubsubSub, topic: pubsubTopic)
server.subscribe(subscriptionEvent, some(relayHandler))
server.subscribe(subscriptionEvent, some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic"
await sleepAsync(FUTURE_TIMEOUT)
# Generate Messages

View File

@ -282,8 +282,10 @@ procSuite "Peer Manager":
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
@ -323,7 +325,8 @@ procSuite "Peer Manager":
node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node3.peerManager.connectToRelayPeers()
@ -352,8 +355,10 @@ procSuite "Peer Manager":
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
@ -393,7 +398,8 @@ procSuite "Peer Manager":
node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node3.peerManager.manageRelayPeers()
@ -482,9 +488,11 @@ procSuite "Peer Manager":
await node1.start()
await node2.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
node1.wakuRelay.codec = betaCodec
await node2.mountRelay()
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
node2.wakuRelay.codec = betaCodec
require:
@ -506,7 +514,8 @@ procSuite "Peer Manager":
peerStorage = storage,
)
await node3.mountRelay()
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
node3.wakuRelay.codec = stableCodec
check:
# Node 2 and 3 have differing codecs

View File

@ -23,8 +23,10 @@ procSuite "Relay (GossipSub) Peer Exchange":
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)
# When both client and server mount relay without a handler
await node1.mountRelay(@[DefaultRelayShard])
await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))).isOkOr:
assert false, "Failed to mount relay"
# Then the relays are mounted without a handler
check:
@ -73,9 +75,12 @@ procSuite "Relay (GossipSub) Peer Exchange":
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler
# Givem the nodes mount relay with a peer exchange handler
await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))
(await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
(await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1

View File

@ -37,9 +37,12 @@ suite "Waku DNS Discovery":
node3 = newTestWakuNode(nodeKey3, bindIp, Port(63503))
enr3 = node3.enr
await node1.mountRelay()
await node2.mountRelay()
await node3.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await allFutures([node1.start(), node2.start(), node3.start()])
# Build and sign tree
@ -75,7 +78,8 @@ suite "Waku DNS Discovery":
nodeKey4 = generateSecp256k1Key()
node4 = newTestWakuNode(nodeKey4, bindIp, Port(63504))
await node4.mountRelay()
(await node4.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node4.start()
var wakuDnsDisc = WakuDnsDiscovery.init(location, resolver).get()

View File

@ -31,11 +31,13 @@ suite "Waku Keepalive":
completionFut.complete(true)
await node1.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.mountLibp2pPing()
await node2.start()
await node2.mountRelay()
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let pingProto = Ping.new(handler = pingHandler)
await pingProto.start()

View File

@ -34,13 +34,15 @@ suite "WakuNode":
# Setup node 1 with stable codec "/vac/waku/relay/2.0.0"
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
node1.wakuRelay.codec = "/vac/waku/relay/2.0.0"
# Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2"
check:
@ -61,7 +63,14 @@ suite "WakuNode":
msg.payload == payload
completionFut.complete(true)
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node2.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic"
await sleepAsync(2000.millis)
var res = await node1.publish(some($shard), message)
@ -92,8 +101,10 @@ suite "WakuNode":
node2PeerId = $(node2.switch.peerInfo.peerId)
node2Dns4Addr = "/dns4/localhost/tcp/61022/p2p/" & node2PeerId
await node1.mountRelay()
await node2.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await allFutures([node1.start(), node2.start()])
@ -117,7 +128,8 @@ suite "WakuNode":
# Initialize and start node1
await node1.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# Create an array to hold the other nodes
var otherNodes: seq[WakuNode] = @[]
@ -129,7 +141,8 @@ suite "WakuNode":
port = 60012 + i * 2 # Ensure unique ports for each node
node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port))
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
otherNodes.add(node)
# Connect all other nodes to node1
@ -296,10 +309,12 @@ suite "WakuNode":
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61016))
await node1.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay()
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node2.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
@ -337,10 +352,12 @@ suite "WakuNode":
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61020))
await node1.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay()
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node2.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])

View File

@ -30,7 +30,8 @@ suite "WakuNode - Relay":
# Relay protocol starts if mounted after node start
await node1.start()
await node1.mountRelay()
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
check:
GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false
@ -41,7 +42,8 @@ suite "WakuNode - Relay":
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0))
await node2.mountRelay()
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
check:
# Relay has not yet started as node has not yet started
@ -69,13 +71,16 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node3.start()
await node3.mountRelay(@[shard])
(await node3.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await allFutures(
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
@ -93,7 +98,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
var res = await node1.publish(some($shard), message)
@ -136,13 +148,16 @@ suite "WakuNode - Relay":
# start all the nodes
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node3.start()
await node3.mountRelay(@[shard])
(await node3.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -179,7 +194,14 @@ suite "WakuNode - Relay":
# relay handler is called
completionFut.complete(true)
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
var res = await node1.publish(some($shard), message1)
@ -221,7 +243,8 @@ suite "WakuNode - Relay":
connOk == true
# Node 1 subscribes to topic
nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
@ -265,10 +288,12 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -283,7 +308,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
let res = await node2.publish(some($shard), message)
@ -314,10 +346,12 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -332,7 +366,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
let res = await node2.publish(some($shard), message)
@ -363,10 +404,12 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
#delete websocket peer address
# TODO: a better way to find the index - this is too brittle
@ -385,7 +428,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
let res = await node2.publish(some($shard), message)
@ -418,10 +468,12 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -436,7 +488,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
let res = await node2.publish(some($shard), message)
@ -477,10 +536,12 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[shard])
(await node1.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
await node2.mountRelay(@[shard])
(await node2.mountRelay(@[shard])).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -495,7 +556,14 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
let res = await node2.publish(some($shard), message)
@ -564,14 +632,15 @@ suite "WakuNode - Relay":
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Unsubscribe keep the subscription if other content topics also use the shard":
asyncTest "Only one subscription is allowed for contenttopics that generate the same shard":
## Setup
let
nodeKey = generateSecp256k1Key()
node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0))
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountSharding(1, 1).isOk
## Given
@ -593,19 +662,19 @@ suite "WakuNode - Relay":
"topic must use the same shard"
## When
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler))
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)).isErrOr:
assert false,
"The subscription should fail because is already subscribe to that shard"
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)).isErrOr:
assert false,
"The subscription should fail because is already subscribe to that shard"
## Then
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB))
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr:
assert false, "Failed to unsubscribe to topic: " & $error
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicA))
check node.wakuRelay.isSubscribed(shard)
node.unsubscribe((kind: ContentUnsub, topic: contentTopicC))
check not node.wakuRelay.isSubscribed(shard)
## Cleanup
await node.stop()

View File

@ -5,6 +5,7 @@ import
stew/byteutils,
stew/shims/net as stewNet,
chronos,
chronicles,
libp2p/switch,
libp2p/protocols/pubsub/pubsub
@ -50,12 +51,6 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} =
)
)
proc setupRelayWithRln*(
node: WakuNode, identifier: uint, shards: seq[RelayShard]
) {.async.} =
await node.mountRelay(shards)
await setupRln(node, identifier)
proc subscribeToContentTopicWithHandler*(
node: WakuNode, contentTopic: string
): Future[bool] =
@ -66,7 +61,9 @@ proc subscribeToContentTopicWithHandler*(
if topic == topic:
completionFut.complete(true)
node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))
(node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))).isOkOr:
error "Failed to subscribe to content topic", error
completionFut.complete(true)
return completionFut
proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] =
@ -77,7 +74,9 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo
if topic == pubsubTopic:
completionFut.complete(true)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
(node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))).isOkOr:
error "Failed to subscribe to pubsub topic", error
completionFut.complete(false)
return completionFut
proc sendRlnMessage*(

View File

@ -58,7 +58,8 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultRelayShard])
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -74,7 +75,8 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultRelayShard])
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -89,7 +91,8 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultRelayShard])
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -115,8 +118,14 @@ procSuite "WakuNode - RLN relay":
if topic == DefaultPubsubTopic:
completionFut.complete(true)
# mount the relay handler
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
# prepare the message payload
@ -126,6 +135,11 @@ procSuite "WakuNode - RLN relay":
var message = WakuMessage(payload: @payload, contentTopic: contentTopic)
doAssert(node1.wakuRlnRelay.unsafeAppendRLNProof(message, epochTime()).isOk())
debug "Nodes participating in the test",
node1 = shortLog(node1.switch.peerInfo.peerId),
node2 = shortLog(node2.switch.peerInfo.peerId),
node3 = shortLog(node3.switch.peerInfo.peerId)
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
@ -187,9 +201,18 @@ procSuite "WakuNode - RLN relay":
elif topic == $shards[1]:
rxMessagesTopic2 = rxMessagesTopic2 + 1
## This unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[0])).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[1])).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
# mount the relay handlers
nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(1000.millis)
# generate some messages with rln proofs first. generating
@ -250,7 +273,8 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultRelayShard])
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -266,7 +290,8 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultRelayShard])
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -281,7 +306,8 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultRelayShard])
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -307,8 +333,14 @@ procSuite "WakuNode - RLN relay":
if topic == DefaultPubsubTopic:
completionFut.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
# mount the relay handler
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
# prepare the message payload
@ -366,7 +398,8 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultRelayShard])
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -382,7 +415,8 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultRelayShard])
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
@ -397,7 +431,8 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultRelayShard])
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig3 = WakuRlnConfig(
@ -456,8 +491,14 @@ procSuite "WakuNode - RLN relay":
if msg.payload == wm4.payload:
completionFut4.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
# mount the relay handler for node3
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
## node1 publishes and relays 4 messages to node2
@ -500,12 +541,15 @@ procSuite "WakuNode - RLN relay":
epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4
# Given both nodes mount relay and rlnrelay
await node1.mountRelay(shardSeq)
(await node1.mountRelay(shardSeq)).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
await node1.mountRlnRelay(wakuRlnConfig1)
(await node1.mountRlnRelay(wakuRlnConfig1)).isOkOr:
assert false, "Failed to mount rlnrelay"
# Mount rlnrelay in node2 in off-chain mode
await node2.mountRelay(@[DefaultRelayShard])
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)
@ -548,7 +592,8 @@ procSuite "WakuNode - RLN relay":
if msg == wm6:
completionFut6.complete(true)
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
# Given all messages have an rln proof and are published by the node 1
let publishSleepDuration: Duration = 5000.millis
@ -638,12 +683,14 @@ procSuite "WakuNode - RLN relay":
# Given both nodes mount relay and rlnrelay
# Mount rlnrelay in node1 in off-chain mode
await node1.mountRelay(shardSeq)
(await node1.mountRelay(shardSeq)).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
await node1.mountRlnRelay(wakuRlnConfig1)
# Mount rlnrelay in node2 in off-chain mode
await node2.mountRelay(@[DefaultRelayShard])
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)

View File

@ -5,6 +5,7 @@ import
stew/byteutils,
stew/shims/net as stewNet,
chronos,
chronicles,
libp2p/switch,
libp2p/protocols/pubsub/pubsub
@ -45,7 +46,10 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo
if topic == pubsubTopic:
completionFut.complete(true)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)).isOkOr:
error "failed to subscribe to relay", topic = pubsubTopic, error = error
completionFut.complete(false)
return completionFut
proc sendRlnMessage*(

View File

@ -102,7 +102,8 @@ suite "Waku v2 REST API CORS Handling":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -155,7 +156,8 @@ suite "Waku v2 REST API CORS Handling":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -211,7 +213,8 @@ suite "Waku v2 REST API CORS Handling":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -258,7 +261,8 @@ suite "Waku v2 REST API CORS Handling":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")

View File

@ -37,7 +37,8 @@ suite "Waku v2 REST API - Debug":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -66,7 +67,8 @@ suite "Waku v2 REST API - Debug":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")

View File

@ -54,7 +54,9 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
await testSetup.serviceNode.mountRelay()
(await testSetup.serviceNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay: " & $error
await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds)
await testSetup.subscriberNode.mountFilterClient()
@ -278,7 +280,8 @@ suite "Waku v2 Rest API - Filter V2":
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
# When
var requestBody = FilterSubscribeRequest(
@ -323,7 +326,8 @@ suite "Waku v2 Rest API - Filter V2":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
let requestBody = FilterSubscribeRequest(
requestId: "1001",
@ -394,7 +398,8 @@ suite "Waku v2 Rest API - Filter V2":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
let requestBody = FilterSubscribeRequest(
requestId: "1001",

View File

@ -42,7 +42,8 @@ suite "Waku v2 REST API - health":
let node = testWakuNode()
let healthMonitor = WakuNodeHealthMonitor()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
healthMonitor.setOverallHealth(HealthStatus.INITIALIZING)

View File

@ -58,8 +58,10 @@ proc init(
testSetup.consumerNode.start(),
)
await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
(await testSetup.consumerNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay: " & $error
(await testSetup.serviceNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay: " & $error
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()
@ -129,10 +131,13 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -161,7 +166,8 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -218,10 +224,13 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

View File

@ -58,8 +58,10 @@ proc init(
testSetup.consumerNode.start(),
)
await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
(await testSetup.consumerNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await testSetup.serviceNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await testSetup.serviceNode.mountLegacyLightPush(rateLimit)
testSetup.pushNode.mountLegacyLightPushClient()
@ -124,10 +126,13 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to topic"
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to topic"
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -156,7 +161,8 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to topic"
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
@ -216,10 +222,13 @@ suite "Waku v2 Rest API - lightpush":
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to topic"
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
).isOkOr:
assert false, "Failed to subscribe to topic"
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

View File

@ -41,7 +41,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -95,7 +96,8 @@ suite "Waku v2 Rest API - Relay":
shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3)
shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4)
await node.mountRelay(@[shard0, shard1, shard2, shard3])
(await node.mountRelay(@[shard0, shard1, shard2, shard3, shard4])).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -144,7 +146,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -220,7 +223,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
@ -245,7 +249,8 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to pubsub topic"
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -275,7 +280,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountSharding(1, 8).isOk
var restPort = Port(0)
@ -324,11 +330,13 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
restServer.start()
restPort = restServer.httpServer.address.port # update with bound port for client use
@ -347,11 +355,18 @@ suite "Waku v2 Rest API - Relay":
cache.contentSubscribe("/waku/2/default-contentY/proto")
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayDeleteAutoSubscriptionsV1(contentTopics)
var response = await client.relayPostAutoSubscriptionsV1(contentTopics)
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
response = await client.relayDeleteAutoSubscriptionsV1(contentTopics)
# Then
check:
@ -373,7 +388,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -437,7 +453,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
@ -461,7 +478,8 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: ContentSub, topic: DefaultContentTopic))
node.subscribe((kind: ContentSub, topic: DefaultContentTopic)).isOkOr:
assert false, "Failed to subscribe to content topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -489,7 +507,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
@ -539,7 +558,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
@ -564,7 +584,8 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -594,7 +615,8 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
@ -619,7 +641,8 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1

View File

@ -86,7 +86,8 @@ procSuite "Waku Rest API - Store v3":
asyncTest "invalid cursor":
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -165,7 +166,8 @@ procSuite "Waku Rest API - Store v3":
asyncTest "Filter by start and end time":
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -330,7 +332,8 @@ procSuite "Waku Rest API - Store v3":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -403,7 +406,8 @@ procSuite "Waku Rest API - Store v3":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -492,7 +496,8 @@ procSuite "Waku Rest API - Store v3":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -548,7 +553,8 @@ procSuite "Waku Rest API - Store v3":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")

View File

@ -301,12 +301,12 @@ proc setupProtocols(
debug "Setting max message size", num_bytes = parsedMaxMsgSize
try:
(
await mountRelay(
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
)
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
).isOkOr:
return err("failed to mount waku relay protocol: " & $error)
# Add validation keys to protected topics
var subscribedProtectedShards: seq[ProtectedShard]

View File

@ -256,7 +256,7 @@ proc mountStoreSync*(
## Waku relay
proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) =
proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return
@ -301,30 +301,34 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) =
proc subscribe*(
node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler)
) =
): Result[void, string] =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentSub:
let shard = node.wakuSharding.getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
of PubsubSub:
(subscription.topic, none(ContentTopic))
else:
return
return err("Unsupported subscription type in relay subscribe")
if node.wakuRelay.isSubscribed(pubsubTopic):
debug "already subscribed to topic", pubsubTopic
return err("Already subscribed to topic: " & $pubsubTopic)
if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()):
error "Invalid API call to `subscribe`. Was already subscribed"
return
return err("Invalid API call to `subscribe`. Was already subscribed")
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
node.registerRelayDefaultHandler(pubsubTopic)
@ -335,43 +339,49 @@ proc subscribe*(
if contentTopicOp.isSome():
node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler
proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) =
return ok()
proc unsubscribe*(
node: WakuNode, subscription: SubscriptionEvent
): Result[void, string] =
## Unsubscribes from a specific PubSub or Content topic.
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentUnsub:
let shard = node.wakuSharding.getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
of PubsubUnsub:
(subscription.topic, none(ContentTopic))
else:
return
return err("Unsupported subscription type in relay unsubscribe")
if not node.wakuRelay.isSubscribed(pubsubTopic):
error "Invalid API call to `unsubscribe`. Was not subscribed"
error "Invalid API call to `unsubscribe`. Was not subscribed", pubsubTopic
return
err("Invalid API call to `unsubscribe`. Was not subscribed to: " & $pubsubTopic)
if contentTopicOp.isSome():
# Remove this handler only
var handler: TopicHandler
## TODO: refactor this part. I think we can simplify it
if node.contentTopicHandlers.pop(contentTopicOp.get(), handler):
debug "unsubscribe", contentTopic = contentTopicOp.get()
node.wakuRelay.unsubscribe(pubsubTopic, handler)
if contentTopicOp.isNone() or node.wakuRelay.topics.getOrDefault(pubsubTopic).len == 1:
# Remove all handlers
node.wakuRelay.unsubscribe(pubsubTopic)
else:
debug "unsubscribe", pubsubTopic = pubsubTopic
node.wakuRelay.unsubscribeAll(pubsubTopic)
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
return ok()
proc publish*(
node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage
): Future[Result[void, string]] {.async, gcsafe.} =
@ -433,20 +443,17 @@ proc mountRelay*(
shards: seq[RelayShard] = @[],
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(DefaultMaxWakuMessageSize),
) {.async, gcsafe.} =
): Future[Result[void, string]] {.async.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
return
return err("wakuRelay already mounted, skipping")
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info "mounting relay protocol"
let initRes = WakuRelay.new(node.switch, maxMessageSize)
if initRes.isErr():
error "failed mounting relay protocol", error = initRes.error
return
node.wakuRelay = initRes.value
node.wakuRelay = WakuRelay.new(node.switch, maxMessageSize).valueOr:
error "failed mounting relay protocol", error = error
return err("failed mounting relay protocol: " & error)
## Add peer exchange handler
if peerExchangeHandler.isSome():
@ -459,11 +466,17 @@ proc mountRelay*(
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
info "relay mounted successfully", shards = shards
## Make sure we don't have duplicates
let uniqueShards = deduplicate(shards)
# Subscribe to shards
for shard in shards:
node.subscribe((kind: PubsubSub, topic: $shard))
for shard in uniqueShards:
node.subscribe((kind: PubsubSub, topic: $shard)).isOkOr:
error "failed to subscribe to shard", error = error
return err("failed to subscribe to shard in mountRelay: " & error)
info "relay mounted successfully", shards = uniqueShards
return ok()
## Waku filter
@ -1218,6 +1231,7 @@ proc mountRlnRelay*(
raise
newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()
if (rlnConf.rlnRelayUserMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit):
error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
let validator = generateRlnValidator(rlnRelay, spamHandler)

View File

@ -18,7 +18,8 @@ import
waku/waku_api/rest/legacy_store/handlers as rest_store_legacy_api,
waku/waku_api/rest/health/handlers as rest_health_api,
waku/waku_api/rest/admin/handlers as rest_admin_api,
waku/waku_core/topics
waku/waku_core/topics,
waku/waku_relay/protocol
## Monitoring and external interfaces
@ -129,18 +130,31 @@ proc startRestServerProtocolSupport*(
## Relay REST API
if conf.relay:
## This MessageCache is used, f.e., in js-waku<>nwaku interop tests.
## js-waku tests asks nwaku-docker through REST whether a message is properly received.
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
let handler = messageCacheHandler(cache)
let handler: WakuRelayHandler = messageCacheHandler(cache)
for shard in conf.shards:
let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard)
cache.pubsubSubscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
## TODO: remove this line. use observer-observable pattern
## within waku_node::registerRelayDefaultHandler
discard node.wakuRelay.subscribe(pubsubTopic, handler)
for contentTopic in conf.contentTopics:
cache.contentSubscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
error "Autosharding error in REST", error = error
continue
let pubsubTopic = $shard
## TODO: remove this line. use observer-observable pattern
## within waku_node::registerRelayDefaultHandler
discard node.wakuRelay.subscribe(pubsubTopic, handler)
installRelayApiHandlers(router, node, cache)
else:

View File

@ -66,9 +66,13 @@ proc installRelayApiHandlers*(
for pubsubTopic in newTopics:
cache.pubsubSubscribe(pubsubTopic)
node.subscribe(
(kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))
)
).isOkOr:
let errorMsg = "Subscribe failed:" & $error
error "SUBSCRIBE failed", error = errorMsg
return RestApiResponse.internalServerError(errorMsg)
return RestApiResponse.ok()
@ -88,7 +92,10 @@ proc installRelayApiHandlers*(
# Unsubscribe all handlers from requested topics
for pubsubTopic in req:
cache.pubsubUnsubscribe(pubsubTopic)
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic))
node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)).isOkOr:
let errorMsg = "Unsubscribe failed:" & $error
error "UNSUBSCRIBE failed", error = errorMsg
return RestApiResponse.internalServerError(errorMsg)
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
@ -193,9 +200,13 @@ proc installRelayApiHandlers*(
for contentTopic in newTopics:
cache.contentSubscribe(contentTopic)
node.subscribe(
(kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache))
)
).isOkOr:
let errorMsg = "Subscribe failed:" & $error
error "SUBSCRIBE failed", error = errorMsg
return RestApiResponse.internalServerError(errorMsg)
return RestApiResponse.ok()
@ -211,7 +222,10 @@ proc installRelayApiHandlers*(
for contentTopic in req:
cache.contentUnsubscribe(contentTopic)
node.unsubscribe((kind: ContentUnsub, topic: contentTopic))
node.unsubscribe((kind: ContentUnsub, topic: contentTopic)).isOkOr:
let errorMsg = "Unsubscribe failed:" & $error
error "UNSUBSCRIBE failed", error = errorMsg
return RestApiResponse.internalServerError(errorMsg)
return RestApiResponse.ok()

View File

@ -129,7 +129,8 @@ type
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
topicValidator: Table[PubsubTopic, ValidatorHandler]
# map topic with its assigned validator within pubsub
publishObservers: seq[PublishObserver]
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
@ -427,7 +428,7 @@ proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} =
# rejects messages that are not WakuMessage
let wrappedValidator = proc(
pubsubTopic: string, message: messages.Message
@ -516,9 +517,10 @@ proc subscribe*(
# Add the ordered validator to the topic
# This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator.
# Otherwise this might lead to unintended behaviour.
if not w.validatorInserted.hasKey(pubSubTopic):
if not w.topicValidator.hasKey(pubSubTopic):
let newValidator = w.generateOrderedValidator()
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
w.validatorInserted[pubSubTopic] = true
w.topicValidator[pubSubTopic] = newValidator
# set this topic parameters for scoring
w.topicParams[pubsubTopic] = TopicParameters
@ -534,14 +536,36 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
debug "unsubscribe all", pubsubTopic = pubsubTopic
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.validatorInserted.del(pubsubTopic)
w.topicValidator.del(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) =
## Unsubscribe this handler on this pubsub topic
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
if not w.topicValidator.hasKey(pubsubTopic):
error "unsubscribe no validator for this topic", pubsubTopic
return
debug "unsubscribe", pubsubTopic = pubsubTopic
if pubsubtopic notin Pubsub(w).topics:
error "not subscribed to the given topic", pubsubTopic
return
procCall GossipSub(w).unsubscribe(pubsubTopic, handler)
var topicHandlerSeq: seq[TopicHandler]
var topicValidator: ValidatorHandler
try:
topicHandlerSeq = Pubsub(w).topics[pubsubTopic]
if topicHandlerSeq.len == 0:
error "unsubscribe no handler for this topic", pubsubTopic
return
topicValidator = w.topicValidator[pubsubTopic]
except KeyError:
error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg()
return
let topicHandler = topicHandlerSeq[0]
debug "unsubscribe", pubsubTopic
procCall GossipSub(w).unsubscribe($pubsubTopic, topicHandler)
## TODO: uncomment the following line when https://github.com/vacp2p/nim-libp2p/pull/1356
## is available in a nim-libp2p release.
# procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
@ -624,7 +648,4 @@ proc getNumConnectedPeers*(
proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
## Returns a seq containing the current list of subscribed topics
var topics: seq[PubsubTopic]
for t in w.validatorInserted.keys():
topics.add(t)
return topics
return PubSub(w).topics.keys.toSeq().mapIt(cast[PubsubTopic](it))