Merge f02e8bc6685310ac7577f61b0d7e084298191596 into 57ff24760fee77c711acaaea56ff9b9e150f6a27

This commit is contained in:
Tanya S 2026-06-27 13:28:02 +02:00 committed by GitHub
commit 49cbe8b7a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 96 additions and 137 deletions

View File

@ -544,7 +544,7 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
epochSizeSec: conf.rlnEpochSizeSec,
)
waitFor node.mountRlnRelay(rlnConf, spamHandler = some(spamHandler))
waitFor node.setRlnValidator(rlnConf, spamHandler = some(spamHandler))
let membershipIndex = node.rln.groupManager.membershipIndex.get()
let identityCredential = node.rln.groupManager.idCredentials.get()

View File

@ -619,7 +619,7 @@ when isMainModule:
)
try:
waitFor node.mountRlnRelay(rlnConf)
waitFor node.setRlnValidator(rlnConf)
except CatchableError:
error "failed to setup RLN", error = getCurrentExceptionMsg()
quit(QuitFailure)

View File

@ -337,7 +337,7 @@ proc setupProtocols(
)
try:
await node.mountRlnRelay(rlnConf)
await node.setRlnValidator(rlnConf)
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

View File

@ -13,6 +13,7 @@ import
libp2p/crypto/crypto,
libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/builders,
libp2p/transports/tcptransport,
@ -29,7 +30,6 @@ import
waku_archive,
waku_store_sync,
rln,
rln/adapters/relay as waku_rln_adapter,
node/waku_node,
node/subscription_manager,
node/peer_manager,
@ -185,27 +185,80 @@ proc mountRelay*(
## Waku RLN Relay
proc mountRlnRelay*(
proc setRlnValidator*(
node: WakuNode,
rlnConf: WakuRlnConfig,
spamHandler = none(SpamHandler),
registrationHandler = none(RegistrationHandler),
) {.async.} =
info "mounting rln relay"
if node.wakuRelay.isNil():
raise newException(
CatchableError, "WakuRelay protocol is not mounted, cannot mount Rln"
)
info "setting rln validator"
let rln = (await Rln.new(rlnConf, registrationHandler)).valueOr:
raise newException(CatchableError, "failed to mount Rln: " & error)
raise newException(CatchableError, "failed to set rln validator: " & error)
if (rlnConf.userMessageLimit > rln.groupManager.rlnRelayMaxMessageLimit):
error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
let validator = generateRlnValidator(rln, spamHandler)
error "rln-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
node.rln = rln
if node.wakuRelay.isNil():
info "WakuRelay not mounted; RLN validator not set"
return
## Bridges RLN's protocol-agnostic message validation into a relay
## (gossipsub) validator. The core decision is made by
## `validateMessageAndUpdateLog`; this maps the result to
## `pubsub.ValidationResult` so the validator can be installed on
## WakuRelay's validator chain.
proc validator(
topic: string, message: WakuMessage
): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"
rln.clearNullifierLog()
let msgProof = RateLimitProof.init(message.proof).valueOr:
trace "rln validator reject", error = error
return pubsub.ValidationResult.Reject
# validate the message and update log
let validationRes = await rln.validateMessageAndUpdateLog(message)
let
proof = byteutils.toHex(msgProof.proof)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
case validationRes
of Valid:
trace "message validity is verified, relaying",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
waku_rln_valid_messages_total.inc(labelValues = [topic])
return pubsub.ValidationResult.Accept
of Invalid:
trace "message validity could not be verified, discarding",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
return pubsub.ValidationResult.Reject
of Spam:
trace "A spam message is found! yay! discarding:",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
if spamHandler.isSome():
let handler = spamHandler.get()
handler(message)
return pubsub.ValidationResult.Reject
# register rln validator as default validator
info "Registering RLN validator"
node.wakuRelay.addValidator(validator, "RLN validation failed")
node.rln = rln

View File

@ -1,78 +0,0 @@
{.push raises: [].}
import
std/options,
chronicles,
chronos,
results,
stew/byteutils,
libp2p/protocols/pubsub/pubsub
import ../rln, ../protocol_types, ../protocol_metrics, ../conversion_utils
import logos_delivery/waku/[waku_relay, waku_core]
logScope:
topics = "waku rln adapter"
proc generateRlnValidator*(
rln: Rln, spamHandler = none(SpamHandler)
): WakuValidatorHandler =
## Bridges RLN's protocol-agnostic message validation into a relay
## (gossipsub) validator. The core decision is made by
## `validateMessageAndUpdateLog`; this adapter maps the result to
## `pubsub.ValidationResult` so the validator can be installed on
## WakuRelay's validator chain.
## Validation logic follows https://rfc.vac.dev/spec/17/
proc validator(
topic: string, message: WakuMessage
): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"
rln.clearNullifierLog()
let msgProof = RateLimitProof.init(message.proof).valueOr:
trace "generateRlnValidator reject", error = error
return pubsub.ValidationResult.Reject
# validate the message and update log
let validationRes = await rln.validateMessageAndUpdateLog(message)
let
proof = byteutils.toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
payload = string.fromBytes(message.payload)
case validationRes
of Valid:
trace "message validity is verified, relaying",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
waku_rln_valid_messages_total.inc(labelValues = [topic])
return pubsub.ValidationResult.Accept
of Invalid:
trace "message validity could not be verified, discarding",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
return pubsub.ValidationResult.Reject
of Spam:
trace "A spam message is found! yay! discarding:",
proof = proof,
root = root,
shareX = shareX,
shareY = shareY,
nullifier = nullifier
if spamHandler.isSome():
let handler = spamHandler.get()
handler(message)
return pubsub.ValidationResult.Reject
return validator

View File

@ -128,7 +128,7 @@ suite "RLN Proofs as a Lightpush Service":
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountRlnRelay(wakuRlnConfig)
await server.setRlnValidator(wakuRlnConfig)
check (await server.mountLegacyLightPush()).isOk()
client.mountLegacyLightPushClient()

View File

@ -125,7 +125,7 @@ suite "RLN Proofs as a Lightpush Service":
(await server.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await server.mountRlnRelay(wakuRlnConfig)
await server.setRlnValidator(wakuRlnConfig)
check (await server.mountLightPush()).isOk()
client.mountLightPushClient()

View File

@ -93,7 +93,7 @@ proc setupRelayWithOnChainRln*(
node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig
) {.async.} =
await node.mountRelay(shards)
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
suite "Waku RlnRelay - End to End - Static":
var
@ -128,22 +128,6 @@ suite "Waku RlnRelay - End to End - Static":
await allFutures(client.stop(), server.stop())
suite "Mount":
asyncTest "Can't mount if relay is not mounted":
# Given Relay and RLN are not mounted
check:
server.wakuRelay == nil
server.rln == nil
# When RlnRelay is mounted
let catchRes = catch:
await server.setupStaticRln(1)
# Then Relay and RLN are not mounted,and the process fails
check:
server.wakuRelay == nil
server.rln == nil
catchRes.error()[].msg == "WakuRelay protocol is not mounted, cannot mount Rln"
asyncTest "Pubsub topics subscribed before mounting RlnRelay are added to it":
# Given the node enables Relay and Rln while subscribing to a pubsub topic
await server.setupRelayWithStaticRln(1.uint, @[pubsubTopic])
@ -232,7 +216,7 @@ suite "Waku RlnRelay - End to End - Static":
)
try:
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
except CatchableError as e:
check e.msg ==
"failed to mount Rln: rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"

View File

@ -36,7 +36,7 @@ proc newTestWakuRelay*(switch = newTestSwitch()): Future[WakuRelay] {.async.} =
return proto
proc setupRln*(node: WakuNode, identifier: uint) {.async.} =
await node.mountRlnRelay(
await node.setRlnValidator(
WakuRlnConfig(dynamic: false, credIndex: some(identifier), epochSizeSec: 1)
)

View File

@ -52,7 +52,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig1 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.setRlnValidator(wakuRlnConfig1)
await node1.start()
# Registration is mandatory before sending messages with rln-relay
@ -76,7 +76,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig2 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(2))
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.setRlnValidator(wakuRlnConfig2)
await node2.start()
let manager2 = cast[OnchainGroupManager](node2.rln.groupManager)
@ -94,7 +94,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig3 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(3))
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.setRlnValidator(wakuRlnConfig3)
await node3.start()
let manager3 = cast[OnchainGroupManager](node3.rln.groupManager)
@ -165,7 +165,7 @@ procSuite "WakuNode - RLN relay":
assert false, "Failed to mount relay"
let wakuRlnConfig1 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.setRlnValidator(wakuRlnConfig1)
await node1.start()
let manager1 = cast[OnchainGroupManager](node1.rln.groupManager)
let idCredentials1 = generateCredentials()
@ -182,7 +182,7 @@ procSuite "WakuNode - RLN relay":
assert false, "Failed to mount relay"
let wakuRlnConfig2 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(2))
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.setRlnValidator(wakuRlnConfig2)
await node2.start()
let manager2 = cast[OnchainGroupManager](node2.rln.groupManager)
let idCredentials2 = generateCredentials()
@ -199,7 +199,7 @@ procSuite "WakuNode - RLN relay":
assert false, "Failed to mount relay"
let wakuRlnConfig3 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(3))
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.setRlnValidator(wakuRlnConfig3)
await node3.start()
let manager3 = cast[OnchainGroupManager](node3.rln.groupManager)
let idCredentials3 = generateCredentials()
@ -315,7 +315,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig1 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.setRlnValidator(wakuRlnConfig1)
await node1.start()
let manager1 = cast[OnchainGroupManager](node1.rln.groupManager)
@ -336,7 +336,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig2 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(2))
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.setRlnValidator(wakuRlnConfig2)
await node2.start()
let manager2 = cast[OnchainGroupManager](node2.rln.groupManager)
@ -352,7 +352,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig3 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(3))
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.setRlnValidator(wakuRlnConfig3)
await node3.start()
let manager3 = cast[OnchainGroupManager](node3.rln.groupManager)
@ -426,7 +426,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig1 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.setRlnValidator(wakuRlnConfig1)
await node1.start()
# Registration is mandatory before sending messages with rln-relay
@ -449,7 +449,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig2 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(2))
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.setRlnValidator(wakuRlnConfig2)
await node2.start()
# Registration is mandatory before sending messages with rln-relay
@ -467,7 +467,7 @@ procSuite "WakuNode - RLN relay":
let wakuRlnConfig3 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(3))
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.setRlnValidator(wakuRlnConfig3)
await node3.start()
# Registration is mandatory before sending messages with rln-relay
@ -595,7 +595,7 @@ procSuite "WakuNode - RLN relay":
assert false, "Failed to mount relay"
let wakuRlnConfig1 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.setRlnValidator(wakuRlnConfig1)
await node1.start()
# Registration is mandatory before sending messages with rln-relay
@ -614,7 +614,7 @@ procSuite "WakuNode - RLN relay":
assert false, "Failed to mount relay"
let wakuRlnConfig2 =
getWakuRlnConfig(manager = manager, index = MembershipIndex(2))
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.setRlnValidator(wakuRlnConfig2)
await node2.start()
# Registration is mandatory before sending messages with rln-relay

View File

@ -22,7 +22,7 @@ proc setupStaticRln*(
identifier: uint,
rlnRelayEthContractAddress: Option[string] = none(string),
) {.async.} =
await node.mountRlnRelay(
await node.setRlnValidator(
WakuRlnConfig(dynamic: false, credIndex: some(identifier), epochSizeSec: 1)
)

View File

@ -67,7 +67,7 @@ suite "Waku v2 REST API - health":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# kick in rln (currently the only check for health)
await node.mountRlnRelay(
await node.setRlnValidator(
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
)

View File

@ -261,7 +261,7 @@ suite "Waku v2 Rest API - Relay":
assert false, "Failed to mount relay"
let wakuRlnConfig = getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
await node.start()
# Registration is mandatory before sending messages with rln-relay
let manager = cast[OnchainGroupManager](node.rln.groupManager)
@ -509,7 +509,7 @@ suite "Waku v2 Rest API - Relay":
let wakuRlnConfig =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await meshNode.mountRlnRelay(wakuRlnConfig)
await meshNode.setRlnValidator(wakuRlnConfig)
await meshNode.start()
const testPubsubTopic = PubsubTopic("/waku/2/rs/1/0")
proc dummyHandler(
@ -530,7 +530,7 @@ suite "Waku v2 Rest API - Relay":
let wakuRlnConfig =
getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
await node.start()
await node.connectToNodes(@[meshNode.peerInfo.toRemotePeerInfo()])
@ -600,7 +600,7 @@ suite "Waku v2 Rest API - Relay":
require node.mountAutoSharding(1, 8).isOk
let wakuRlnConfig = getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
await node.start()
# Registration is mandatory before sending messages with rln-relay
@ -659,7 +659,7 @@ suite "Waku v2 Rest API - Relay":
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig = getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
await node.start()
# Registration is mandatory before sending messages with rln-relay
@ -731,7 +731,7 @@ suite "Waku v2 Rest API - Relay":
require node.mountAutoSharding(1, 8).isOk
let wakuRlnConfig = getWakuRlnConfig(manager = manager, index = MembershipIndex(1))
await node.mountRlnRelay(wakuRlnConfig)
await node.setRlnValidator(wakuRlnConfig)
await node.start()
# Registration is mandatory before sending messages with rln-relay