feat(rln-relay-v2): nonce/messageId manager (#2413)

* feat(rln-relay-v2): nonce/messageId manager

* fix: simplify
This commit is contained in:
Aaryamann Challani 2024-02-13 10:18:02 +05:30 committed by GitHub
parent 76ea0c8d72
commit 3842584558
13 changed files with 221 additions and 64 deletions

View File

@ -187,11 +187,11 @@ proc publish(c: Chat, line: string) =
if not isNil(c.node.wakuRlnRelay): if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic, # for future version when we support more than one rln protected content topic,
# we should check the message content topic as well # we should check the message content topic as well
let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time)) let appendRes = c.node.wakuRlnRelay.appendRLNProof(message, float64(time))
if not success: if appendRes.isErr():
debug "could not append rate limit proof to the message", success=success debug "could not append rate limit proof to the message"
else: else:
debug "rate limit proof is appended to the message", success=success debug "rate limit proof is appended to the message"
let decodeRes = RateLimitProof.init(message.proof) let decodeRes = RateLimitProof.init(message.proof)
if decodeRes.isErr(): if decodeRes.isErr():
error "could not decode the RLN proof" error "could not decode the RLN proof"
@ -514,14 +514,25 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "rln-relay preparation is in progress..." echo "rln-relay preparation is in progress..."
let rlnConf = WakuRlnConfig( when defined(rln_v2):
rlnRelayDynamic: conf.rlnRelayDynamic, let rlnConf = WakuRlnConfig(
rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayCredPassword: conf.rlnRelayCredPassword rlnRelayCredPath: conf.rlnRelayCredPath,
) rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
)
else:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
)
waitFor node.mountRlnRelay(rlnConf, waitFor node.mountRlnRelay(rlnConf,
spamHandler=some(spamHandler)) spamHandler=some(spamHandler))

View File

@ -266,6 +266,11 @@ type
defaultValue: "" defaultValue: ""
name: "rln-relay-cred-password" }: string name: "rln-relay-cred-password" }: string
rlnRelayUserMessageLimit* {.
desc: "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.",
defaultValue: 1,
name: "rln-relay-user-message-limit" .}: uint64
# NOTE: Keys are different in nim-libp2p # NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
try: try:

View File

@ -462,15 +462,27 @@ proc setupProtocols(node: WakuNode,
if conf.rlnRelay: if conf.rlnRelay:
let rlnConf = WakuRlnConfig( when defined(rln_v2):
rlnRelayDynamic: conf.rlnRelayDynamic, let rlnConf = WakuRlnConfig(
rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayTreePath: conf.rlnRelayTreePath, rlnRelayCredPassword: conf.rlnRelayCredPassword,
) rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
)
else:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
)
try: try:
waitFor node.mountRlnRelay(rlnConf) waitFor node.mountRlnRelay(rlnConf)

View File

@ -58,7 +58,7 @@ proc sendRlnMessage(
payload: seq[byte] = "Hello".toBytes(), payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} = ): Future[bool] {.async.} =
var message = WakuMessage(payload: payload, contentTopic: contentTopic) var message = WakuMessage(payload: payload, contentTopic: contentTopic)
doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime())) doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk())
discard await client.publish(some(pubsubTopic), message) discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted return isCompleted
@ -249,18 +249,18 @@ suite "Waku RlnRelay - End to End":
WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic) WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic)
doAssert( doAssert(
client.wakuRlnRelay.appendRLNProof(message1b, epoch + EpochUnitSeconds * 0) client.wakuRlnRelay.appendRLNProof(message1b, epoch + EpochUnitSeconds * 0).isOk()
) )
doAssert( doAssert(
client.wakuRlnRelay.appendRLNProof(message1kib, epoch + EpochUnitSeconds * 1) client.wakuRlnRelay.appendRLNProof(message1kib, epoch + EpochUnitSeconds * 1).isOk()
) )
doAssert( doAssert(
client.wakuRlnRelay.appendRLNProof(message150kib, epoch + EpochUnitSeconds * 2) client.wakuRlnRelay.appendRLNProof(message150kib, epoch + EpochUnitSeconds * 2).isOk()
) )
doAssert( doAssert(
client.wakuRlnRelay.appendRLNProof( client.wakuRlnRelay.appendRLNProof(
message151kibPlus, epoch + EpochUnitSeconds * 3 message151kibPlus, epoch + EpochUnitSeconds * 3
) ).isOk()
) )
# When sending the 1B message # When sending the 1B message

View File

@ -4,4 +4,5 @@ import
./test_rln_group_manager_onchain, ./test_rln_group_manager_onchain,
./test_rln_group_manager_static, ./test_rln_group_manager_static,
./test_waku_rln_relay, ./test_waku_rln_relay,
./test_wakunode_rln_relay ./test_wakunode_rln_relay,
./test_rln_nonce_manager

View File

@ -0,0 +1,51 @@
{.used.}
import
testutils/unittests,
chronos,
os
import
../../../waku/waku_rln_relay/nonce_manager
suite "Nonce manager":
test "should initialize successfully":
let nm = NonceManager.init(nonceLimit = 100.uint)
check:
nm.nonceLimit == 100.uint
nm.nextNonce == 0.uint
test "should generate a new nonce":
let nm = NonceManager.init(nonceLimit = 100.uint)
let nonceRes = nm.get()
assert nonceRes.isOk(), $nonceRes.error
check:
nonceRes.get() == 0.uint
nm.nextNonce == 1.uint
test "should fail to generate a new nonce if limit is reached":
let nm = NonceManager.init(nonceLimit = 1.uint)
let nonceRes = nm.get()
let nonceRes2 = nm.get()
assert nonceRes.isOk(), $nonceRes.error
assert nonceRes2.isErr(), "Expected error, got: " & $nonceRes2.value
check:
nonceRes2.error.kind == NonceManagerErrorKind.NonceLimitReached
test "should generate a new nonce if epoch is crossed":
let nm = NonceManager.init(nonceLimit = 1.uint, epoch = float(0.000001))
let nonceRes = nm.get()
sleep(1)
let nonceRes2 = nm.get()
assert nonceRes.isOk(), $nonceRes.error
assert nonceRes2.isOk(), $nonceRes2.error
check:
nonceRes.value == 0.uint
nonceRes2.value == 0.uint

View File

@ -687,9 +687,9 @@ suite "Waku rln relay":
# ensure proofs are added # ensure proofs are added
require: require:
proofAdded1 proofAdded1.isOk()
proofAdded2 proofAdded2.isOk()
proofAdded3 proofAdded3.isOk()
# validate messages # validate messages
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid) # validateMessage proc checks the validity of the message fields and adds it to the log (if valid)

View File

@ -85,7 +85,7 @@ procSuite "WakuNode - RLN relay":
# prepare the epoch # prepare the epoch
var message = WakuMessage(payload: @payload, contentTopic: contentTopic) var message = WakuMessage(payload: @payload, contentTopic: contentTopic)
doAssert(node1.wakuRlnRelay.appendRLNProof(message, epochTime())) doAssert(node1.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk())
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
@ -155,12 +155,12 @@ procSuite "WakuNode - RLN relay":
for i in 0..<3: for i in 0..<3:
var message = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0]) var message = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0])
doAssert(nodes[0].wakuRlnRelay.appendRLNProof(message, epochTime)) doAssert(nodes[0].wakuRlnRelay.appendRLNProof(message, epochTime).isOk())
messages1.add(message) messages1.add(message)
for i in 0..<3: for i in 0..<3:
var message = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1]) var message = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1])
doAssert(nodes[1].wakuRlnRelay.appendRLNProof(message, epochTime)) doAssert(nodes[1].wakuRlnRelay.appendRLNProof(message, epochTime).isOk())
messages2.add(message) messages2.add(message)
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs) # publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
@ -346,9 +346,9 @@ procSuite "WakuNode - RLN relay":
# check proofs are added correctly # check proofs are added correctly
check: check:
proofAdded1 proofAdded1.isOk()
proofAdded2 proofAdded2.isOk()
proofAdded3 proofAdded3.isOk()
# relay handler for node3 # relay handler for node3
var completionFut1 = newFuture[bool]() var completionFut1 = newFuture[bool]()
@ -452,9 +452,9 @@ procSuite "WakuNode - RLN relay":
# check proofs are added correctly # check proofs are added correctly
check: check:
proofAdded1 proofAdded1.isOk()
proofAdded2 proofAdded2.isOk()
proofAdded3 proofAdded3.isOk()
# relay handler for node2 # relay handler for node2
var completionFut1 = newFuture[bool]() var completionFut1 = newFuture[bool]()

View File

@ -102,9 +102,8 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
# if RLN is mounted, append the proof to the message # if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil(): if not node.wakuRlnRelay.isNil():
# append the proof to the message # append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message, node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix())) float64(getTime().toUnix())).isOkOr:
if not success:
raise newException(ValueError, "Failed to publish: error appending RLN proof to message") raise newException(ValueError, "Failed to publish: error appending RLN proof to message")
# validate the message before sending it # validate the message before sending it
let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message) let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message)
@ -201,9 +200,8 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
# if RLN is mounted, append the proof to the message # if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil(): if not node.wakuRlnRelay.isNil():
# append the proof to the message # append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message, node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix())) float64(getTime().toUnix())).isOkOr:
if not success:
raise newException(ValueError, "Failed to publish: error appending RLN proof to message") raise newException(ValueError, "Failed to publish: error appending RLN proof to message")
# validate the message before sending it # validate the message before sending it
let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message) let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message)

View File

@ -130,9 +130,8 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# if RLN is mounted, append the proof to the message # if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil(): if not node.wakuRlnRelay.isNil():
# append the proof to the message # append the proof to the message
let success = node.wakuRlnRelay.appendRLNProof(message, node.wakuRlnRelay.appendRLNProof(message,
float64(getTime().toUnix())) float64(getTime().toUnix())).isOkOr:
if not success:
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message") return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr: (await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
@ -219,7 +218,7 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# if RLN is mounted, append the proof to the message # if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil(): if not node.wakuRlnRelay.isNil():
if not node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())): node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())).isOkOr:
return RestApiResponse.internalServerError( return RestApiResponse.internalServerError(
"Failed to publish: error appending RLN proof to message") "Failed to publish: error appending RLN proof to message")

View File

@ -188,10 +188,12 @@ when defined(rln_v2):
return err("user message limit is not set") return err("user message limit is not set")
waku_rln_proof_generation_duration_seconds.nanosecondTime: waku_rln_proof_generation_duration_seconds.nanosecondTime:
let proof = proofGen(rlnInstance = g.rlnInstance, let proof = proofGen(rlnInstance = g.rlnInstance,
data = data, data = data,
memKeys = g.idCredentials.get(), membership = g.idCredentials.get(),
memIndex = g.membershipIndex.get(), index = g.membershipIndex.get(),
epoch = epoch).valueOr: epoch = epoch,
userMessageLimit = g.userMessageLimit.get(),
messageId = messageId).valueOr:
return err("proof generation failed: " & $error) return err("proof generation failed: " & $error)
return ok(proof) return ok(proof)
else: else:

View File

@ -0,0 +1,67 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
stew/results,
times
import
./constants
export
chronos,
times,
results,
constants
# This module contains the NonceManager interface
# The NonceManager is responsible for managing the messageId used to generate RLN proofs
# It should be used to fetch a new messageId every time a proof is generated
# It refreshes the messageId every `epoch` seconds
type
Nonce* = uint64
NonceManager* = ref object of RootObj
epoch*: float64
nextNonce*: Nonce
lastNonceTime*: float64
nonceLimit*: Nonce
NonceManagerErrorKind* = enum
NonceLimitReached
NonceManagerError* = object
kind*: NonceManagerErrorKind
error*: string
NonceManagerResult*[T] = Result[T, NonceManagerError]
proc `$`*(ne: NonceManagerError): string =
case ne.kind
of NonceLimitReached:
return "NonceLimitReached: " & ne.error
proc init*(T: type NonceManager, nonceLimit: Nonce, epoch = EpochUnitSeconds): T =
return NonceManager(
epoch: epoch,
nextNonce: 0,
lastNonceTime: 0,
nonceLimit: nonceLimit
)
proc get*(n: NonceManager): NonceManagerResult[Nonce] =
let now = getTime().toUnixFloat()
var retNonce = n.nextNonce
if now - n.lastNonceTime >= n.epoch: retNonce = 0
n.nextNonce = retNonce + 1
n.lastNonceTime = now
if retNonce >= n.nonceLimit:
return err(NonceManagerError(kind: NonceLimitReached,
error: "Nonce limit reached. Please wait for the next epoch"))
return ok(retNonce)

View File

@ -20,6 +20,10 @@ import
./constants, ./constants,
./protocol_types, ./protocol_types,
./protocol_metrics ./protocol_metrics
when defined(rln_v2):
import ./nonce_manager
import import
../waku_relay, # for WakuRelayHandler ../waku_relay, # for WakuRelayHandler
../waku_core, ../waku_core,
@ -37,6 +41,8 @@ type WakuRlnConfig* = object
rlnRelayCredPath*: string rlnRelayCredPath*: string
rlnRelayCredPassword*: string rlnRelayCredPassword*: string
rlnRelayTreePath*: string rlnRelayTreePath*: string
when defined(rln_v2):
rlnRelayUserMessageLimit*: uint64
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[( proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string seq[RawMembershipCredentials], string
@ -78,7 +84,8 @@ type WakuRLNRelay* = ref object of RootObj
nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]] nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager groupManager*: GroupManager
nonce*: uint64 when defined(rln_v2):
nonceManager: NonceManager
method stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = method stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
## stops the rln-relay protocol ## stops the rln-relay protocol
@ -282,7 +289,7 @@ proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] =
proc appendRLNProof*(rlnPeer: WakuRLNRelay, proc appendRLNProof*(rlnPeer: WakuRLNRelay,
msg: var WakuMessage, msg: var WakuMessage,
senderEpochTime: float64): bool = senderEpochTime: float64): RlnRelayResult[void] =
## returns true if it can create and append a `RateLimitProof` to the supplied `msg` ## returns true if it can create and append a `RateLimitProof` to the supplied `msg`
## returns false otherwise ## returns false otherwise
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds. ## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
@ -292,16 +299,16 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
let epoch = calcEpoch(senderEpochTime) let epoch = calcEpoch(senderEpochTime)
when defined(rln_v2): when defined(rln_v2):
# TODO: add support for incrementing nonce, will address in another PR let nonce = rlnPeer.nonceManager.get().valueOr:
let proofGenRes = rlnPeer.groupManager.generateProof(input, epoch, 1) return err("could not get new message id to generate an rln proof: " & $error)
let proof = rlnPeer.groupManager.generateProof(input, epoch, nonce).valueOr:
return err("could not generate rln-v2 proof: " & $error)
else: else:
let proofGenRes = rlnPeer.groupManager.generateProof(input, epoch) let proof = rlnPeer.groupManager.generateProof(input, epoch).valueOr:
return err("could not generate rln proof: " & $error)
if proofGenRes.isErr(): msg.proof = proof.encode().buffer
return false return ok()
msg.proof = proofGenRes.get().encode().buffer
return true
proc clearNullifierLog(rlnPeer: WakuRlnRelay) = proc clearNullifierLog(rlnPeer: WakuRlnRelay) =
# clear the first MaxEpochGap epochs of the nullifer log # clear the first MaxEpochGap epochs of the nullifer log
@ -397,7 +404,11 @@ proc mount(conf: WakuRlnConfig,
# Start the group sync # Start the group sync
await groupManager.startGroupSync() await groupManager.startGroupSync()
return WakuRLNRelay(groupManager: groupManager) when defined(rln_v2):
return WakuRLNRelay(groupManager: groupManager,
nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit))
else:
return WakuRLNRelay(groupManager: groupManager)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
## returns true if the rln-relay protocol is ready to relay messages ## returns true if the rln-relay protocol is ready to relay messages