diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 3f02eefe6..0da62ecb9 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -22,10 +22,10 @@ import libp2p/[switch, # manage transports, a single entry poi protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS nameresolving/dnsresolver]# define DNS resolution -import +import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_lightpush/rpc, - ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store, ../../waku/v2/node/[waku_node, waku_payload, waku_metrics], ../../waku/v2/node/dnsdisc/waku_dnsdisc, @@ -190,7 +190,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) = except ValueError: # Formatting fail. Print chat line in any case. echo chatLine - + c.prompt = false showChatPrompt(c) trace "Printing message", topic=DefaultPubsubTopic, chatLine, @@ -225,21 +225,26 @@ proc publish(c: Chat, line: string) = contentTopic: c.contentTopic, version: version, timestamp: getNanosecondTime(time)) when defined(rln): if not isNil(c.node.wakuRlnRelay): - # for future version when we support more than one rln protected content topic, + # for future version when we support more than one rln protected content topic, # we should check the message content topic as well let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time)) if not success: debug "could not append rate limit proof to the message", success=success else: debug "rate limit proof is appended to the message", success=success - # TODO move it to log after doogfooding - let msgEpoch = fromEpoch(message.proof.epoch) - if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(message.proof.epoch): + let decodeRes = RateLimitProof.init(message.proof) + if decodeRes.isErr(): + error "could not decode RLN proof" + + let proof = decodeRes.get() + # TODO move it to log after dogfooding + let msgEpoch = fromEpoch(proof.epoch) + if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(proof.epoch): echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!" else: echo "--rln epoch: ", msgEpoch # update the last epoch - c.node.wakuRlnRelay.lastEpoch = message.proof.epoch + c.node.wakuRlnRelay.lastEpoch = proof.epoch if not c.node.wakuLightPush.isNil(): # Attempt lightpush asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message) @@ -252,22 +257,27 @@ proc publish(c: Chat, line: string) = var message = WakuMessage(payload: chat2pb.buffer, contentTopic: c.contentTopic, version: 0, timestamp: getNanosecondTime(time)) when defined(rln): - if not isNil(c.node.wakuRlnRelay): - # for future version when we support more than one rln protected content topic, + if not isNil(c.node.wakuRlnRelay): + # for future version when we support more than one rln protected content topic, # we should check the message content topic as well let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time)) if not success: debug "could not append rate limit proof to the message", success=success else: debug "rate limit proof is appended to the message", success=success - # TODO move it to log after doogfooding - let msgEpoch = fromEpoch(message.proof.epoch) + let decodeRes = RateLimitProof.init(message.proof) + if decodeRes.isErr(): + error "could not decode the RLN proof" + + let proof = decodeRes.get() + # TODO move it to log after dogfooding + let msgEpoch = fromEpoch(proof.epoch) if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch: echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!" else: echo "--rln epoch: ", msgEpoch # update the last epoch - c.node.wakuRlnRelay.lastEpoch = message.proof.epoch + c.node.wakuRlnRelay.lastEpoch = proof.epoch if not c.node.wakuLightPush.isNil(): # Attempt lightpush @@ -316,7 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} = let address = await c.transp.readLine() if address.len > 0: await c.connectToNodes(@[address]) - + elif line.startsWith("/nick"): # Set a new nickname c.nick = await readNick(c.transp) @@ -325,9 +335,9 @@ proc writeAndPrint(c: Chat) {.async.} = elif line.startsWith("/exit"): if not c.node.wakuFilter.isNil(): echo "unsubscribing from content filters..." - + await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic) - + echo "quitting..." await c.node.stop() @@ -364,7 +374,7 @@ proc processInput(rfd: AsyncFD) {.async.} = let transp = fromPipe(rfd) conf = Chat2Conf.load() - + # set log level if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) @@ -375,7 +385,7 @@ proc processInput(rfd: AsyncFD) {.async.} = Port(uint16(conf.udpPort) + conf.portsShift)) node = WakuNode.new(conf.nodekey, conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), - extIp, extTcpPort, + extIp, extTcpPort, wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), wsEnabled = conf.websocketSupport, wssEnabled = conf.websocketSecureSupport) @@ -387,9 +397,9 @@ proc processInput(rfd: AsyncFD) {.async.} = if conf.relay: await node.mountRelay(conf.topics.split(" ")) - + await node.mountLibp2pPing() - + let nick = await readNick(transp) echo "Welcome, " & nick & "!" @@ -398,7 +408,7 @@ proc processInput(rfd: AsyncFD) {.async.} = subscribed: true, connected: false, started: true, - nick: nick, + nick: nick, prompt: false, contentTopic: conf.contentTopic, symKey: generateSymKey(conf.contentTopic)) @@ -406,13 +416,13 @@ proc processInput(rfd: AsyncFD) {.async.} = if conf.staticnodes.len > 0: echo "Connecting to static peers..." await connectToNodes(chat, conf.staticnodes) - + var dnsDiscoveryUrl = none(string) if conf.fleet != Fleet.none: # Use DNS discovery to connect to selected fleet echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..." - + if conf.fleet == Fleet.test: dnsDiscoveryUrl = some("enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im") else: @@ -437,7 +447,7 @@ proc processInput(rfd: AsyncFD) {.async.} = trace "resolving", domain=domain let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer - + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) if wakuDnsDiscovery.isOk: @@ -467,7 +477,7 @@ proc processInput(rfd: AsyncFD) {.async.} = elif discoveredNodes.len > 0: echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers" storenode = some(discoveredNodes[rand(0..len(discoveredNodes) - 1)]) - + if storenode.isSome(): # We have a viable storenode. Let's query it for historical messages. echo "Connecting to storenode: " & $(storenode.get()) @@ -487,7 +497,7 @@ proc processInput(rfd: AsyncFD) {.async.} = let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic])) if queryRes.isOk(): storeHandler(queryRes.value) - + # NOTE Must be mounted after relay if conf.lightpushnode != "": await mountLightPush(node) @@ -514,7 +524,7 @@ proc processInput(rfd: AsyncFD) {.async.} = trace "Hit subscribe handler", topic let decoded = WakuMessage.decode(data) - + if decoded.isOk(): if decoded.get().contentTopic == chat.contentTopic: chat.printReceivedMessage(decoded.get()) @@ -524,7 +534,7 @@ proc processInput(rfd: AsyncFD) {.async.} = let topic = DefaultPubsubTopic node.subscribe(topic, handler) - when defined(rln): + when defined(rln): if conf.rlnRelay: info "WakuRLNRelay is enabled" @@ -539,9 +549,9 @@ proc processInput(rfd: AsyncFD) {.async.} = showChatPrompt(chat) proc registrationHandler(txHash: string) {.gcsafe, closure.} = echo "You are registered to the rln membership contract, find details of your registration transaction in https://goerli.etherscan.io/tx/0x", txHash - + echo "rln-relay preparation is in progress..." - + let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, @@ -569,7 +579,7 @@ proc processInput(rfd: AsyncFD) {.async.} = if conf.metricsServer: startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift)) - + await chat.readWriteLoop() diff --git a/tests/v2/test_waku_rln_relay.nim b/tests/v2/test_waku_rln_relay.nim index 29835e39c..0eb2dd1be 100644 --- a/tests/v2/test_waku_rln_relay.nim +++ b/tests/v2/test_waku_rln_relay.nim @@ -854,14 +854,22 @@ suite "Waku rln relay": for index, x in shareX3.mpairs: shareX3[index] = 3 let shareY3 = shareX3 - ## TODO: when zerokit rln is integrated, RateLimitProof should be initialized passing a rlnIdentifier too (now implicitely set to 0) + proc encodeAndGetBuf(proof: RateLimitProof): seq[byte] = + return proof.encode().buffer + let wm1 = WakuMessage(proof: RateLimitProof(epoch: epoch, - nullifier: nullifier1, shareX: shareX1, shareY: shareY1)) + nullifier: nullifier1, + shareX: shareX1, + shareY: shareY1).encodeAndGetBuf()) wm2 = WakuMessage(proof: RateLimitProof(epoch: epoch, - nullifier: nullifier2, shareX: shareX2, shareY: shareY2)) + nullifier: nullifier2, + shareX: shareX2, + shareY: shareY2).encodeAndGetBuf()) wm3 = WakuMessage(proof: RateLimitProof(epoch: epoch, - nullifier: nullifier3, shareX: shareX3, shareY: shareY3)) + nullifier: nullifier3, + shareX: shareX3, + shareY: shareY3).encodeAndGetBuf()) # check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares # no duplicate for wm1 should be found, since the log is empty diff --git a/tests/v2/test_wakunode_rln_relay.nim b/tests/v2/test_wakunode_rln_relay.nim index 7e76244f8..3b1a68f0f 100644 --- a/tests/v2/test_wakunode_rln_relay.nim +++ b/tests/v2/test_wakunode_rln_relay.nim @@ -235,8 +235,9 @@ procSuite "WakuNode - RLN relay": memKeys = node1.wakuRlnRelay.membershipKeyPair, memIndex = MembershipIndex(1), epoch = epoch) - doAssert(rateLimitProofRes.isOk()) - let rateLimitProof = rateLimitProofRes.value + require: + rateLimitProofRes.isOk() + let rateLimitProof = rateLimitProofRes.get().encode().buffer let message = WakuMessage(payload: @payload, contentTopic: contentTopic, diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index bbe4b1183..30f8b24f1 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -15,18 +15,14 @@ import ../../common/protobuf, ../utils/time -when defined(rln): - import - ./waku_rln_relay/protocol_types - - const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default + type PubsubTopic* = string ContentTopic* = string -const +const DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto") DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto") @@ -41,9 +37,9 @@ type WakuMessage* = object # this field will be used in the rln-relay protocol # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec when defined(rln): - proof*: RateLimitProof + proof*: seq[byte] # The ephemeral field indicates if the message should - # be stored. bools and uints are + # be stored. bools and uints are # equivalent in serialization of the protobuf ephemeral*: bool @@ -58,7 +54,7 @@ proc encode*(message: WakuMessage): ProtoBuffer = buf.write3(3, message.version) buf.write3(10, zint64(message.timestamp)) when defined(rln): - buf.write3(21, message.proof.encode()) + buf.write3(21, message.proof) buf.write3(31, uint64(message.ephemeral)) buf.finish3() @@ -76,11 +72,11 @@ proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = discard ?pb.getField(10, timestamp) msg.timestamp = Timestamp(timestamp) - # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec + # Experimental: this is part of https://rfc.vac.dev/spec/17/ spec when defined(rln): var proofBytes: seq[byte] - discard ?pb.getField(21, proofBytes) - msg.proof = ?RateLimitProof.init(proofBytes) + if ?pb.getField(21, proofBytes): + msg.proof = proofBytes var ephemeral: uint if ?pb.getField(31, ephemeral): diff --git a/waku/v2/protocol/waku_rln_relay/utils.nim b/waku/v2/protocol/waku_rln_relay/utils.nim index 16bf2c9d6..4a9a43da8 100644 --- a/waku/v2/protocol/waku_rln_relay/utils.nim +++ b/waku/v2/protocol/waku_rln_relay/utils.nim @@ -583,20 +583,29 @@ proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool ## otherwise, returns false ## Returns an error if it cannot check for duplicates + let decodeRes = RateLimitProof.init(msg.proof) + if decodeRes.isErr(): + return err("failed to decode the RLN proof") + + let proof = decodeRes.get() + # extract the proof metadata of the supplied `msg` - let proofMD = ProofMetadata(nullifier: msg.proof.nullifier, - shareX: msg.proof.shareX, shareY: msg.proof.shareY) + let proofMD = ProofMetadata( + nullifier: proof.nullifier, + shareX: proof.shareX, + shareY: proof.shareY + ) # check if the epoch exists - if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch): + if not rlnPeer.nullifierLog.hasKey(proof.epoch): return ok(false) try: - if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD): + if rlnPeer.nullifierLog[proof.epoch].contains(proofMD): # there is an identical record, ignore rhe mag return ok(false) # check for a message with the same nullifier but different secret shares - let matched = rlnPeer.nullifierLog[msg.proof.epoch].filterIt(( + let matched = rlnPeer.nullifierLog[proof.epoch].filterIt(( it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or (it.shareY != proofMD.shareY))) @@ -615,21 +624,31 @@ proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] = ## saves it in the `nullifierLog` of the `rlnPeer` ## Returns an error if it cannot update the log - let proofMD = ProofMetadata(nullifier: msg.proof.nullifier, - shareX: msg.proof.shareX, shareY: msg.proof.shareY) + let decodeRes = RateLimitProof.init(msg.proof) + if decodeRes.isErr(): + return err("failed to decode the RLN proof") + + let proof = decodeRes.get() + + # extract the proof metadata of the supplied `msg` + let proofMD = ProofMetadata( + nullifier: proof.nullifier, + shareX: proof.shareX, + shareY: proof.shareY + ) debug "proof metadata", proofMD = proofMD # check if the epoch exists - if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch): - rlnPeer.nullifierLog[msg.proof.epoch] = @[proofMD] + if not rlnPeer.nullifierLog.hasKey(proof.epoch): + rlnPeer.nullifierLog[proof.epoch] = @[proofMD] return ok(true) try: # check if an identical record exists - if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD): + if rlnPeer.nullifierLog[proof.epoch].contains(proofMD): return ok(true) # add proofMD to the log - rlnPeer.nullifierLog[msg.proof.epoch].add(proofMD) + rlnPeer.nullifierLog[proof.epoch].add(proofMD) return ok(true) except KeyError as e: return err("the epoch was not found") @@ -680,6 +699,11 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, ## the `msg` does not violate the rate limit ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) ## if `timeOption` is supplied, then the current epoch is calculated based on that + let decodeRes = RateLimitProof.init(msg.proof) + if decodeRes.isErr(): + return MessageValidationResult.Invalid + + let proof = decodeRes.get() # track message count for metrics waku_rln_messages_total.inc() @@ -695,7 +719,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, debug "current epoch", currentEpoch = fromEpoch(epoch) let - msgEpoch = msg.proof.epoch + msgEpoch = proof.epoch # calculate the gaps gap = absDiff(epoch, msgEpoch) @@ -711,8 +735,8 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, return MessageValidationResult.Invalid ## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247 - if not rlnPeer.validateRoot(msg.proof.merkleRoot): - debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) + if not rlnPeer.validateRoot(proof.merkleRoot): + debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) # return MessageValidationResult.Invalid @@ -723,7 +747,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, waku_rln_proof_verification_total.inc() waku_rln_proof_verification_duration_seconds.nanosecondTime: - let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, msg.proof) + let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, proof) if proofVerificationRes.isErr(): waku_rln_errors_total.inc(labelValues=["proof_verification"]) @@ -749,7 +773,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, # it will never error out discard rlnPeer.updateLog(msg) debug "message is valid", payload = string.fromBytes(msg.payload) - let rootIndex = rlnPeer.validMerkleRoots.find(msg.proof.merkleRoot) + let rootIndex = rlnPeer.validMerkleRoots.find(proof.merkleRoot) waku_rln_valid_messages_total.observe(rootIndex.toFloat()) return MessageValidationResult.Valid @@ -775,10 +799,10 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage, memIndex = rlnPeer.membershipIndex, epoch = calcEpoch(senderEpochTime)) - if proof.isErr: + if proof.isErr(): return false - msg.proof = proof.value + msg.proof = proof.value.encode().buffer return true proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] = @@ -955,10 +979,10 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi ## the message validation logic is according to https://rfc.vac.dev/spec/17/ proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" - let msg = WakuMessage.decode(message.data) - if msg.isOk(): + let decodeRes = WakuMessage.decode(message.data) + if decodeRes.isOk(): let - wakumessage = msg.value() + wakumessage = decodeRes.value payload = string.fromBytes(wakumessage.payload) # check the contentTopic @@ -966,15 +990,22 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload return pubsub.ValidationResult.Accept + + let decodeRes = RateLimitProof.init(wakumessage.proof) + if decodeRes.isErr(): + return pubsub.ValidationResult.Reject + + let msgProof = decodeRes.get() + # validate the message let validationRes = node.wakuRlnRelay.validateMessage(wakumessage) - proof = toHex(wakumessage.proof.proof) - epoch = fromEpoch(wakumessage.proof.epoch) - root = inHex(wakumessage.proof.merkleRoot) - shareX = inHex(wakumessage.proof.shareX) - shareY = inHex(wakumessage.proof.shareY) - nullifier = inHex(wakumessage.proof.nullifier) + proof = toHex(msgProof.proof) + epoch = fromEpoch(msgProof.epoch) + root = inHex(msgProof.merkleRoot) + shareX = inHex(msgProof.shareX) + shareY = inHex(msgProof.shareY) + nullifier = inHex(msgProof.nullifier) case validationRes: of Valid: debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload @@ -987,10 +1018,10 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi of Spam: debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - if spamHandler.isSome: - let handler = spamHandler.get + if spamHandler.isSome(): + let handler = spamHandler.get() handler(wakumessage) - return pubsub.ValidationResult.Reject + return pubsub.ValidationResult.Reject # set a validator for the supplied pubsubTopic let pb = PubSub(node.wakuRelay) pb.addValidator(pubsubTopic, validator)