mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 15:16:05 +00:00
chore(rln): run rln in all relay pubsubtopics + remove cli flags (#1917)
This commit is contained in:
parent
b3bb7a1113
commit
af95b5713f
@ -456,7 +456,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
let peerInfo = parsePeerInfo(conf.lightpushnode)
|
||||
if peerInfo.isOk():
|
||||
await mountLightPush(node)
|
||||
node.mountLightPushClient()
|
||||
node.mountLightPushClient()
|
||||
node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec)
|
||||
else:
|
||||
error "LightPush not mounted. Couldn't parse conf.lightpushnode",
|
||||
@ -510,8 +510,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
let rlnConf = WakuRlnConfig(
|
||||
rlnRelayDynamic: conf.rlnRelayDynamic,
|
||||
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
|
||||
rlnRelayContentTopic: conf.rlnRelayContentTopic,
|
||||
rlnRelayCredIndex: conf.rlnRelayCredIndex,
|
||||
rlnRelayMembershipGroupIndex: conf.rlnRelayMembershipGroupIndex,
|
||||
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
|
||||
|
@ -242,16 +242,6 @@ type
|
||||
defaultValue: 0
|
||||
name: "rln-relay-membership-group-index" }: uint
|
||||
|
||||
rlnRelayContentTopic* {.
|
||||
desc: "the content topic for which rln-relay gets enabled",
|
||||
defaultValue: "/toy-chat/3/mingde/proto"
|
||||
name: "rln-relay-content-topic" }: ContentTopic
|
||||
|
||||
rlnRelayPubsubTopic* {.
|
||||
desc: "the pubsub topic for which rln-relay gets enabled",
|
||||
defaultValue: "/waku/2/default-waku/proto"
|
||||
name: "rln-relay-pubsub-topic" }: string
|
||||
|
||||
rlnRelayDynamic* {.
|
||||
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false",
|
||||
defaultValue: false
|
||||
|
@ -397,8 +397,6 @@ proc setupProtocols(node: WakuNode,
|
||||
|
||||
let rlnConf = WakuRlnConfig(
|
||||
rlnRelayDynamic: conf.rlnRelayDynamic,
|
||||
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
|
||||
rlnRelayContentTopic: conf.rlnRelayContentTopic,
|
||||
rlnRelayCredIndex: conf.rlnRelayCredIndex,
|
||||
rlnRelayMembershipGroupIndex: conf.rlnRelayMembershipGroupIndex,
|
||||
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
|
||||
|
@ -155,16 +155,6 @@ type
|
||||
defaultValue: 0
|
||||
name: "rln-relay-membership-group-index" }: uint
|
||||
|
||||
rlnRelayPubsubTopic* {.
|
||||
desc: "the pubsub topic for which rln-relay gets enabled",
|
||||
defaultValue: "/waku/2/default-waku/proto"
|
||||
name: "rln-relay-pubsub-topic" }: string
|
||||
|
||||
rlnRelayContentTopic* {.
|
||||
desc: "the content topic for which rln-relay gets enabled",
|
||||
defaultValue: "/toy-chat/3/mingde/proto"
|
||||
name: "rln-relay-content-topic" }: string
|
||||
|
||||
rlnRelayDynamic* {.
|
||||
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false",
|
||||
defaultValue: false
|
||||
|
@ -18,9 +18,6 @@ import
|
||||
../../../waku/waku_keystore,
|
||||
../testlib/common
|
||||
|
||||
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
|
||||
const RlnRelayContentTopic = "waku/2/rlnrelay/proto"
|
||||
|
||||
proc createRLNInstanceWrapper(): RLNResult =
|
||||
return createRlnInstance(tree_path = genTempPath("rln_tree", "waku_rln_relay"))
|
||||
|
||||
@ -256,7 +253,7 @@ suite "Waku rln relay":
|
||||
require:
|
||||
rlnInstance.isOk()
|
||||
let rln = rlnInstance.get()
|
||||
|
||||
|
||||
require:
|
||||
rln.setMetadata(RlnMetadata(lastProcessedBlock: 128)).isOk()
|
||||
|
||||
@ -269,7 +266,7 @@ suite "Waku rln relay":
|
||||
|
||||
check:
|
||||
metadata.lastProcessedBlock == 128
|
||||
|
||||
|
||||
|
||||
test "Merkle tree consistency check between deletion and insertion":
|
||||
# create an RLN instance
|
||||
@ -660,8 +657,6 @@ suite "Waku rln relay":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: RlnRelayPubsubTopic,
|
||||
rlnRelayContentTopic: RlnRelayContentTopic,
|
||||
rlnRelayCredIndex: index.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_2"))
|
||||
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
|
||||
@ -709,13 +704,11 @@ suite "Waku rln relay":
|
||||
msgValidate2 == MessageValidationResult.Spam
|
||||
msgValidate3 == MessageValidationResult.Valid
|
||||
msgValidate4 == MessageValidationResult.Invalid
|
||||
|
||||
|
||||
asyncTest "should validate invalid proofs if bandwidth is available":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: RlnRelayPubsubTopic,
|
||||
rlnRelayContentTopic: RlnRelayContentTopic,
|
||||
rlnRelayCredIndex: index.uint,
|
||||
rlnRelayBandwidthThreshold: 4,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3"))
|
||||
@ -736,7 +729,7 @@ suite "Waku rln relay":
|
||||
# this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof)
|
||||
wm3 = WakuMessage(payload: "Invalid message".toBytes())
|
||||
wm4 = WakuMessage(payload: "Spam message".toBytes())
|
||||
|
||||
|
||||
let
|
||||
proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time)
|
||||
proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds)
|
||||
@ -765,7 +758,7 @@ suite "Waku rln relay":
|
||||
msgValidate2 == MessageValidationResult.Valid
|
||||
msgValidate3 == MessageValidationResult.Invalid
|
||||
msgValidate4 == MessageValidationResult.Spam
|
||||
|
||||
|
||||
|
||||
test "toIDCommitment and toUInt256":
|
||||
# create an instance of rln
|
||||
|
@ -23,9 +23,6 @@ import
|
||||
|
||||
from std/times import epochTime
|
||||
|
||||
|
||||
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
|
||||
|
||||
procSuite "WakuNode - RLN relay":
|
||||
asyncTest "testing rln-relay with valid proof":
|
||||
|
||||
@ -40,17 +37,14 @@ procSuite "WakuNode - RLN relay":
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
rlnRelayPubSubTopic = RlnRelayPubsubTopic
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
# set up three nodes
|
||||
# node1
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node1.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 1.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
|
||||
))
|
||||
@ -58,11 +52,9 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.start()
|
||||
|
||||
# node 2
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node2.mountRelay(@[DefaultPubsubTopic])
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 2.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2"),
|
||||
))
|
||||
@ -70,11 +62,9 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.start()
|
||||
|
||||
# node 3
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node3.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 3.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_3"),
|
||||
))
|
||||
@ -88,11 +78,11 @@ procSuite "WakuNode - RLN relay":
|
||||
var completionFut = newFuture[bool]()
|
||||
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
debug "The received topic:", topic
|
||||
if topic == rlnRelayPubSubTopic:
|
||||
if topic == DefaultPubsubTopic:
|
||||
completionFut.complete(true)
|
||||
|
||||
# mount the relay handler
|
||||
node3.subscribe(rlnRelayPubSubTopic, relayHandler)
|
||||
node3.subscribe(DefaultPubsubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# prepare the message payload
|
||||
@ -106,7 +96,7 @@ procSuite "WakuNode - RLN relay":
|
||||
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
|
||||
## verifies the rate limit proof of the message and relays the message to node3
|
||||
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
|
||||
await node1.publish(rlnRelayPubSubTopic, message)
|
||||
await node1.publish(DefaultPubsubTopic, message)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
|
||||
@ -117,6 +107,71 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
||||
asyncTest "testing rln-relay is applied in all rln pubsub/content topics":
|
||||
|
||||
# create 3 nodes
|
||||
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
|
||||
let pubsubTopics = @[
|
||||
PubsubTopic("/waku/2/pubsubtopic-a/proto"),
|
||||
PubsubTopic("/waku/2/pubsubtopic-b/proto")]
|
||||
let contentTopics = @[
|
||||
ContentTopic("/waku/2/content-topic-a/proto"),
|
||||
ContentTopic("/waku/2/content-topic-b/proto")]
|
||||
|
||||
# set up three nodes
|
||||
await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics)))
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
for index, node in nodes:
|
||||
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: index.uint + 1,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $(index+1))))
|
||||
|
||||
# start them
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
|
||||
# connect them together
|
||||
await nodes[0].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()])
|
||||
await nodes[2].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
var rxMessagesTopic1 = 0
|
||||
var rxMessagesTopic2 = 0
|
||||
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
info "relayHandler. The received topic:", topic
|
||||
if topic == pubsubTopics[0]:
|
||||
rxMessagesTopic1 = rxMessagesTopic1 + 1
|
||||
elif topic == pubsubTopics[1]:
|
||||
rxMessagesTopic2 = rxMessagesTopic2 + 1
|
||||
|
||||
# mount the relay handlers
|
||||
nodes[2].subscribe(pubsubTopics[0], relayHandler)
|
||||
nodes[2].subscribe(pubsubTopics[1], relayHandler)
|
||||
await sleepAsync(1000.millis)
|
||||
|
||||
# publish 5+5 messages to both pubsub topics and content topics
|
||||
for i in 0..<5:
|
||||
var message1 = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0])
|
||||
doAssert(nodes[0].wakuRlnRelay.appendRLNProof(message1, epochTime()))
|
||||
|
||||
var message2 = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1])
|
||||
doAssert(nodes[1].wakuRlnRelay.appendRLNProof(message2, epochTime()))
|
||||
|
||||
await nodes[0].publish(pubsubTopics[0], message1)
|
||||
await nodes[1].publish(pubsubTopics[1], message2)
|
||||
|
||||
# wait for gossip to propagate
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# check that node[2] got messages from both topics
|
||||
# and that rln was applied (4+4 messages were spam)
|
||||
check:
|
||||
rxMessagesTopic1 == 1
|
||||
rxMessagesTopic2 == 1
|
||||
|
||||
await allFutures(nodes.mapIt(it.stop()))
|
||||
|
||||
asyncTest "testing rln-relay with invalid proof":
|
||||
let
|
||||
# publisher node
|
||||
@ -129,17 +184,14 @@ procSuite "WakuNode - RLN relay":
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
rlnRelayPubSubTopic = RlnRelayPubsubTopic
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
# set up three nodes
|
||||
# node1
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node1.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 1.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -148,11 +200,9 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.start()
|
||||
|
||||
# node 2
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node2.mountRelay(@[DefaultPubsubTopic])
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 2.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -161,11 +211,9 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.start()
|
||||
|
||||
# node 3
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node3.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 3.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -181,11 +229,11 @@ procSuite "WakuNode - RLN relay":
|
||||
var completionFut = newFuture[bool]()
|
||||
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
debug "The received topic:", topic
|
||||
if topic == rlnRelayPubSubTopic:
|
||||
if topic == DefaultPubsubTopic:
|
||||
completionFut.complete(true)
|
||||
|
||||
# mount the relay handler
|
||||
node3.subscribe(rlnRelayPubSubTopic, relayHandler)
|
||||
node3.subscribe(DefaultPubsubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# prepare the message payload
|
||||
@ -214,7 +262,7 @@ procSuite "WakuNode - RLN relay":
|
||||
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
|
||||
## never gets called
|
||||
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
|
||||
await node1.publish(rlnRelayPubSubTopic, message)
|
||||
await node1.publish(DefaultPubsubTopic, message)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
check:
|
||||
@ -238,17 +286,14 @@ procSuite "WakuNode - RLN relay":
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
rlnRelayPubSubTopic = RlnRelayPubsubTopic
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
# set up three nodes
|
||||
# node1
|
||||
await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node1.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 1.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -257,12 +302,10 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.start()
|
||||
|
||||
# node 2
|
||||
await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node2.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 2.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -271,12 +314,10 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.start()
|
||||
|
||||
# node 3
|
||||
await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic])
|
||||
await node3.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayPubsubTopic: rlnRelayPubSubTopic,
|
||||
rlnRelayContentTopic: contentTopic,
|
||||
rlnRelayCredIndex: 3.uint,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
@ -315,7 +356,7 @@ procSuite "WakuNode - RLN relay":
|
||||
var completionFut4 = newFuture[bool]()
|
||||
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
debug "The received topic:", topic
|
||||
if topic == rlnRelayPubSubTopic:
|
||||
if topic == DefaultPubsubTopic:
|
||||
if msg == wm1:
|
||||
completionFut1.complete(true)
|
||||
if msg == wm2:
|
||||
@ -327,7 +368,7 @@ procSuite "WakuNode - RLN relay":
|
||||
|
||||
|
||||
# mount the relay handler for node3
|
||||
node3.subscribe(rlnRelayPubSubTopic, relayHandler)
|
||||
node3.subscribe(DefaultPubsubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
## node1 publishes and relays 4 messages to node2
|
||||
@ -336,10 +377,10 @@ procSuite "WakuNode - RLN relay":
|
||||
## node2 should detect either of wm1 or wm2 as spam and not relay it
|
||||
## node2 should relay wm3 to node3
|
||||
## node2 should not relay wm4 because it has no valid rln proof
|
||||
await node1.publish(rlnRelayPubSubTopic, wm1)
|
||||
await node1.publish(rlnRelayPubSubTopic, wm2)
|
||||
await node1.publish(rlnRelayPubSubTopic, wm3)
|
||||
await node1.publish(rlnRelayPubSubTopic, wm4)
|
||||
await node1.publish(DefaultPubsubTopic, wm1)
|
||||
await node1.publish(DefaultPubsubTopic, wm2)
|
||||
await node1.publish(DefaultPubsubTopic, wm3)
|
||||
await node1.publish(DefaultPubsubTopic, wm4)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
let
|
||||
|
@ -405,7 +405,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT
|
||||
error "can't get shard", error=topicMapRes.error
|
||||
return
|
||||
else: topicMapRes.get()
|
||||
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
||||
@ -456,7 +456,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten
|
||||
error "can't get shard", error = topicMapRes.error
|
||||
return
|
||||
else: topicMapRes.get()
|
||||
|
||||
|
||||
var futures = collect(newSeq):
|
||||
for pubsub, topics in topicMap.pairs:
|
||||
info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId
|
||||
@ -741,12 +741,13 @@ when defined(rln):
|
||||
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
|
||||
let rlnRelay = rlnRelayRes.get()
|
||||
let validator = generateRlnValidator(rlnRelay, spamHandler)
|
||||
let pb = PubSub(node.wakuRelay)
|
||||
pb.addValidator(rlnRelay.pubsubTopic, validator)
|
||||
|
||||
# register rln validator for all subscribed relay pubsub topics
|
||||
for pubsubTopic in node.wakuRelay.subscribedTopics:
|
||||
debug "Registering RLN validator for topic", pubsubTopic=pubsubTopic
|
||||
procCall GossipSub(node.wakuRelay).addValidator(pubsubTopic, validator)
|
||||
node.wakuRlnRelay = rlnRelay
|
||||
|
||||
|
||||
|
||||
## Waku peer-exchange
|
||||
|
||||
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
|
@ -31,8 +31,6 @@ logScope:
|
||||
|
||||
type WakuRlnConfig* = object
|
||||
rlnRelayDynamic*: bool
|
||||
rlnRelayPubsubTopic*: PubsubTopic
|
||||
rlnRelayContentTopic*: ContentTopic
|
||||
rlnRelayCredIndex*: uint
|
||||
rlnRelayMembershipGroupIndex*: uint
|
||||
rlnRelayEthContractAddress*: string
|
||||
@ -80,10 +78,6 @@ proc calcEpoch*(t: float64): Epoch =
|
||||
return toEpoch(e)
|
||||
|
||||
type WakuRLNRelay* = ref object of RootObj
|
||||
pubsubTopic*: string # the pubsub topic for which rln relay is mounted
|
||||
# contentTopic should be of type waku_core.ContentTopic, however, due to recursive module dependency, the underlying type of ContentTopic is used instead
|
||||
# TODO a long-term solution is to place types with recursive dependency inside one file
|
||||
contentTopic*: string
|
||||
# the log of nullifiers and Shamir shares of the past messages grouped per epoch
|
||||
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
|
||||
lastEpoch*: Epoch # the epoch of the last published rln message
|
||||
@ -171,7 +165,7 @@ proc absDiff*(e1, e2: Epoch): uint64 =
|
||||
else:
|
||||
return epoch2 - epoch1
|
||||
|
||||
proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
msg: WakuMessage,
|
||||
timeOption = none(float64)): MessageValidationResult =
|
||||
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
|
||||
@ -293,10 +287,8 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
|
||||
proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler =
|
||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
|
||||
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
|
||||
## it sets a validator for waku messages, acting in the registered pubsub topic
|
||||
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
||||
let contentTopic = wakuRlnRelay.contentTopic
|
||||
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
|
||||
@ -309,21 +301,12 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
info "message bandwidth limit exceeded, running rate limit proof validation"
|
||||
except OverflowDefect: # not a problem
|
||||
debug "not enough bandwidth, running rate limit proof validation"
|
||||
|
||||
|
||||
let decodeRes = WakuMessage.decode(message.data)
|
||||
if decodeRes.isOk():
|
||||
let
|
||||
wakumessage = decodeRes.value
|
||||
payload = string.fromBytes(wakumessage.payload)
|
||||
|
||||
# check the contentTopic
|
||||
if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic):
|
||||
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
|
||||
return pubsub.ValidationResult.Accept
|
||||
|
||||
|
||||
let wakumessage = decodeRes.value
|
||||
let decodeRes = RateLimitProof.init(wakumessage.proof)
|
||||
|
||||
if decodeRes.isErr():
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
@ -338,6 +321,7 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
shareX = inHex(msgProof.shareX)
|
||||
shareY = inHex(msgProof.shareY)
|
||||
nullifier = inHex(msgProof.nullifier)
|
||||
payload = string.fromBytes(wakumessage.payload)
|
||||
case validationRes:
|
||||
of Valid:
|
||||
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
|
||||
@ -403,12 +387,10 @@ proc mount(conf: WakuRlnConfig,
|
||||
await groupManager.startGroupSync()
|
||||
|
||||
let messageBucket = if conf.rlnRelayBandwidthThreshold > 0:
|
||||
some(TokenBucket.new(conf.rlnRelayBandwidthThreshold))
|
||||
some(TokenBucket.new(conf.rlnRelayBandwidthThreshold))
|
||||
else: none(TokenBucket)
|
||||
|
||||
return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic,
|
||||
contentTopic: conf.rlnRelayContentTopic,
|
||||
groupManager: groupManager,
|
||||
return WakuRLNRelay(groupManager: groupManager,
|
||||
messageBucket: messageBucket)
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user