refactor(rln): decouple rln types from waku message type

This commit is contained in:
Lorenzo Delgado 2022-11-22 18:29:43 +01:00 committed by GitHub
parent 1e8e60caed
commit 299ee3eb5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 79 deletions

View File

@ -22,10 +22,10 @@ import libp2p/[switch, # manage transports, a single entry poi
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
import
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_lightpush/rpc,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store,
../../waku/v2/node/[waku_node, waku_payload, waku_metrics],
../../waku/v2/node/dnsdisc/waku_dnsdisc,
@ -190,7 +190,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) =
except ValueError:
# Formatting fail. Print chat line in any case.
echo chatLine
c.prompt = false
showChatPrompt(c)
trace "Printing message", topic=DefaultPubsubTopic, chatLine,
@ -225,21 +225,26 @@ proc publish(c: Chat, line: string) =
contentTopic: c.contentTopic, version: version, timestamp: getNanosecondTime(time))
when defined(rln):
if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic,
# for future version when we support more than one rln protected content topic,
# we should check the message content topic as well
let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time))
if not success:
debug "could not append rate limit proof to the message", success=success
else:
debug "rate limit proof is appended to the message", success=success
# TODO move it to log after doogfooding
let msgEpoch = fromEpoch(message.proof.epoch)
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(message.proof.epoch):
let decodeRes = RateLimitProof.init(message.proof)
if decodeRes.isErr():
error "could not decode RLN proof"
let proof = decodeRes.get()
# TODO move it to log after dogfooding
let msgEpoch = fromEpoch(proof.epoch)
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(proof.epoch):
echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!"
else:
echo "--rln epoch: ", msgEpoch
# update the last epoch
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
c.node.wakuRlnRelay.lastEpoch = proof.epoch
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
@ -252,22 +257,27 @@ proc publish(c: Chat, line: string) =
var message = WakuMessage(payload: chat2pb.buffer,
contentTopic: c.contentTopic, version: 0, timestamp: getNanosecondTime(time))
when defined(rln):
if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic,
if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic,
# we should check the message content topic as well
let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time))
if not success:
debug "could not append rate limit proof to the message", success=success
else:
debug "rate limit proof is appended to the message", success=success
# TODO move it to log after doogfooding
let msgEpoch = fromEpoch(message.proof.epoch)
let decodeRes = RateLimitProof.init(message.proof)
if decodeRes.isErr():
error "could not decode the RLN proof"
let proof = decodeRes.get()
# TODO move it to log after dogfooding
let msgEpoch = fromEpoch(proof.epoch)
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch:
echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!"
else:
echo "--rln epoch: ", msgEpoch
# update the last epoch
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
c.node.wakuRlnRelay.lastEpoch = proof.epoch
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
@ -316,7 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} =
let address = await c.transp.readLine()
if address.len > 0:
await c.connectToNodes(@[address])
elif line.startsWith("/nick"):
# Set a new nickname
c.nick = await readNick(c.transp)
@ -325,9 +335,9 @@ proc writeAndPrint(c: Chat) {.async.} =
elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."
await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
echo "quitting..."
await c.node.stop()
@ -364,7 +374,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
let
transp = fromPipe(rfd)
conf = Chat2Conf.load()
# set log level
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
@ -375,7 +385,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
Port(uint16(conf.udpPort) + conf.portsShift))
node = WakuNode.new(conf.nodekey, conf.listenAddress,
Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extTcpPort,
extIp, extTcpPort,
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
wsEnabled = conf.websocketSupport,
wssEnabled = conf.websocketSecureSupport)
@ -387,9 +397,9 @@ proc processInput(rfd: AsyncFD) {.async.} =
if conf.relay:
await node.mountRelay(conf.topics.split(" "))
await node.mountLibp2pPing()
let nick = await readNick(transp)
echo "Welcome, " & nick & "!"
@ -398,7 +408,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
subscribed: true,
connected: false,
started: true,
nick: nick,
nick: nick,
prompt: false,
contentTopic: conf.contentTopic,
symKey: generateSymKey(conf.contentTopic))
@ -406,13 +416,13 @@ proc processInput(rfd: AsyncFD) {.async.} =
if conf.staticnodes.len > 0:
echo "Connecting to static peers..."
await connectToNodes(chat, conf.staticnodes)
var dnsDiscoveryUrl = none(string)
if conf.fleet != Fleet.none:
# Use DNS discovery to connect to selected fleet
echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..."
if conf.fleet == Fleet.test:
dnsDiscoveryUrl = some("enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im")
else:
@ -437,7 +447,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(),
resolver)
if wakuDnsDiscovery.isOk:
@ -467,7 +477,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
elif discoveredNodes.len > 0:
echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers"
storenode = some(discoveredNodes[rand(0..len(discoveredNodes) - 1)])
if storenode.isSome():
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())
@ -487,7 +497,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
if queryRes.isOk():
storeHandler(queryRes.value)
# NOTE Must be mounted after relay
if conf.lightpushnode != "":
await mountLightPush(node)
@ -514,7 +524,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
trace "Hit subscribe handler", topic
let decoded = WakuMessage.decode(data)
if decoded.isOk():
if decoded.get().contentTopic == chat.contentTopic:
chat.printReceivedMessage(decoded.get())
@ -524,7 +534,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
when defined(rln):
when defined(rln):
if conf.rlnRelay:
info "WakuRLNRelay is enabled"
@ -539,9 +549,9 @@ proc processInput(rfd: AsyncFD) {.async.} =
showChatPrompt(chat)
proc registrationHandler(txHash: string) {.gcsafe, closure.} =
echo "You are registered to the rln membership contract, find details of your registration transaction in https://goerli.etherscan.io/tx/0x", txHash
echo "rln-relay preparation is in progress..."
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
@ -569,7 +579,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
if conf.metricsServer:
startMetricsServer(conf.metricsServerAddress,
Port(conf.metricsServerPort + conf.portsShift))
await chat.readWriteLoop()

View File

@ -854,14 +854,22 @@ suite "Waku rln relay":
for index, x in shareX3.mpairs: shareX3[index] = 3
let shareY3 = shareX3
## TODO: when zerokit rln is integrated, RateLimitProof should be initialized passing a rlnIdentifier too (now implicitely set to 0)
proc encodeAndGetBuf(proof: RateLimitProof): seq[byte] =
return proof.encode().buffer
let
wm1 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier1, shareX: shareX1, shareY: shareY1))
nullifier: nullifier1,
shareX: shareX1,
shareY: shareY1).encodeAndGetBuf())
wm2 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier2, shareX: shareX2, shareY: shareY2))
nullifier: nullifier2,
shareX: shareX2,
shareY: shareY2).encodeAndGetBuf())
wm3 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier3, shareX: shareX3, shareY: shareY3))
nullifier: nullifier3,
shareX: shareX3,
shareY: shareY3).encodeAndGetBuf())
# check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares
# no duplicate for wm1 should be found, since the log is empty

View File

@ -235,8 +235,9 @@ procSuite "WakuNode - RLN relay":
memKeys = node1.wakuRlnRelay.membershipKeyPair,
memIndex = MembershipIndex(1),
epoch = epoch)
doAssert(rateLimitProofRes.isOk())
let rateLimitProof = rateLimitProofRes.value
require:
rateLimitProofRes.isOk()
let rateLimitProof = rateLimitProofRes.get().encode().buffer
let message = WakuMessage(payload: @payload,
contentTopic: contentTopic,

View File

@ -15,18 +15,14 @@ import
../../common/protobuf,
../utils/time
when defined(rln):
import
./waku_rln_relay/protocol_types
const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default
type
PubsubTopic* = string
ContentTopic* = string
const
const
DefaultPubsubTopic*: PubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
DefaultContentTopic*: ContentTopic = ContentTopic("/waku/2/default-content/proto")
@ -41,9 +37,9 @@ type WakuMessage* = object
# this field will be used in the rln-relay protocol
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
when defined(rln):
proof*: RateLimitProof
proof*: seq[byte]
# The ephemeral field indicates if the message should
# be stored. bools and uints are
# be stored. bools and uints are
# equivalent in serialization of the protobuf
ephemeral*: bool
@ -58,7 +54,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
buf.write3(3, message.version)
buf.write3(10, zint64(message.timestamp))
when defined(rln):
buf.write3(21, message.proof.encode())
buf.write3(21, message.proof)
buf.write3(31, uint64(message.ephemeral))
buf.finish3()
@ -76,11 +72,11 @@ proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
discard ?pb.getField(10, timestamp)
msg.timestamp = Timestamp(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec
# Experimental: this is part of https://rfc.vac.dev/spec/17/ spec
when defined(rln):
var proofBytes: seq[byte]
discard ?pb.getField(21, proofBytes)
msg.proof = ?RateLimitProof.init(proofBytes)
if ?pb.getField(21, proofBytes):
msg.proof = proofBytes
var ephemeral: uint
if ?pb.getField(31, ephemeral):

View File

@ -583,20 +583,29 @@ proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool
## otherwise, returns false
## Returns an error if it cannot check for duplicates
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(nullifier: msg.proof.nullifier,
shareX: msg.proof.shareX, shareY: msg.proof.shareY)
let proofMD = ProofMetadata(
nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
# check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch):
if not rlnPeer.nullifierLog.hasKey(proof.epoch):
return ok(false)
try:
if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD):
if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
# there is an identical record, ignore rhe mag
return ok(false)
# check for a message with the same nullifier but different secret shares
let matched = rlnPeer.nullifierLog[msg.proof.epoch].filterIt((
let matched = rlnPeer.nullifierLog[proof.epoch].filterIt((
it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or
(it.shareY != proofMD.shareY)))
@ -615,21 +624,31 @@ proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] =
## saves it in the `nullifierLog` of the `rlnPeer`
## Returns an error if it cannot update the log
let proofMD = ProofMetadata(nullifier: msg.proof.nullifier,
shareX: msg.proof.shareX, shareY: msg.proof.shareY)
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(
nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
debug "proof metadata", proofMD = proofMD
# check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch):
rlnPeer.nullifierLog[msg.proof.epoch] = @[proofMD]
if not rlnPeer.nullifierLog.hasKey(proof.epoch):
rlnPeer.nullifierLog[proof.epoch] = @[proofMD]
return ok(true)
try:
# check if an identical record exists
if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD):
if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
return ok(true)
# add proofMD to the log
rlnPeer.nullifierLog[msg.proof.epoch].add(proofMD)
rlnPeer.nullifierLog[proof.epoch].add(proofMD)
return ok(true)
except KeyError as e:
return err("the epoch was not found")
@ -680,6 +699,11 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
## the `msg` does not violate the rate limit
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
## if `timeOption` is supplied, then the current epoch is calculated based on that
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return MessageValidationResult.Invalid
let proof = decodeRes.get()
# track message count for metrics
waku_rln_messages_total.inc()
@ -695,7 +719,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
debug "current epoch", currentEpoch = fromEpoch(epoch)
let
msgEpoch = msg.proof.epoch
msgEpoch = proof.epoch
# calculate the gaps
gap = absDiff(epoch, msgEpoch)
@ -711,8 +735,8 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
return MessageValidationResult.Invalid
## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247
if not rlnPeer.validateRoot(msg.proof.merkleRoot):
debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex())
if not rlnPeer.validateRoot(proof.merkleRoot):
debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex())
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
# return MessageValidationResult.Invalid
@ -723,7 +747,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
waku_rln_proof_verification_total.inc()
waku_rln_proof_verification_duration_seconds.nanosecondTime:
let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, msg.proof)
let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, proof)
if proofVerificationRes.isErr():
waku_rln_errors_total.inc(labelValues=["proof_verification"])
@ -749,7 +773,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
# it will never error out
discard rlnPeer.updateLog(msg)
debug "message is valid", payload = string.fromBytes(msg.payload)
let rootIndex = rlnPeer.validMerkleRoots.find(msg.proof.merkleRoot)
let rootIndex = rlnPeer.validMerkleRoots.find(proof.merkleRoot)
waku_rln_valid_messages_total.observe(rootIndex.toFloat())
return MessageValidationResult.Valid
@ -775,10 +799,10 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage,
memIndex = rlnPeer.membershipIndex,
epoch = calcEpoch(senderEpochTime))
if proof.isErr:
if proof.isErr():
return false
msg.proof = proof.value
msg.proof = proof.value.encode().buffer
return true
proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] =
@ -955,10 +979,10 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
## the message validation logic is according to https://rfc.vac.dev/spec/17/
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"
let msg = WakuMessage.decode(message.data)
if msg.isOk():
let decodeRes = WakuMessage.decode(message.data)
if decodeRes.isOk():
let
wakumessage = msg.value()
wakumessage = decodeRes.value
payload = string.fromBytes(wakumessage.payload)
# check the contentTopic
@ -966,15 +990,22 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
return pubsub.ValidationResult.Accept
let decodeRes = RateLimitProof.init(wakumessage.proof)
if decodeRes.isErr():
return pubsub.ValidationResult.Reject
let msgProof = decodeRes.get()
# validate the message
let
validationRes = node.wakuRlnRelay.validateMessage(wakumessage)
proof = toHex(wakumessage.proof.proof)
epoch = fromEpoch(wakumessage.proof.epoch)
root = inHex(wakumessage.proof.merkleRoot)
shareX = inHex(wakumessage.proof.shareX)
shareY = inHex(wakumessage.proof.shareY)
nullifier = inHex(wakumessage.proof.nullifier)
proof = toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
case validationRes:
of Valid:
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
@ -987,10 +1018,10 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
of Spam:
debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
if spamHandler.isSome:
let handler = spamHandler.get
if spamHandler.isSome():
let handler = spamHandler.get()
handler(wakumessage)
return pubsub.ValidationResult.Reject
return pubsub.ValidationResult.Reject
# set a validator for the supplied pubsubTopic
let pb = PubSub(node.wakuRelay)
pb.addValidator(pubsubTopic, validator)