mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
fix: timestamp based validation (#3406)
This commit is contained in:
parent
e4a4313d82
commit
39e65dea28
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, os, sequtils, times, tempfiles],
|
||||
std/[options, os, sequtils, tempfiles],
|
||||
stew/byteutils,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
@ -17,7 +17,10 @@ import
|
||||
waku_rln_relay/protocol_metrics,
|
||||
waku_keystore,
|
||||
],
|
||||
./rln/waku_rln_relay_utils
|
||||
./rln/waku_rln_relay_utils,
|
||||
../testlib/[wakucore, futures, wakunode, testutils]
|
||||
|
||||
from std/times import epochTime
|
||||
|
||||
suite "Waku rln relay":
|
||||
test "key_gen Nim Wrappers":
|
||||
@ -686,7 +689,7 @@ suite "Waku rln relay":
|
||||
# it is a duplicate
|
||||
assert isDuplicate3, "duplicate should be found"
|
||||
|
||||
asyncTest "validateMessageAndUpdateLog test":
|
||||
asyncTest "validateMessageAndUpdateLog: against epoch gap":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let wakuRlnConfig = WakuRlnConfig(
|
||||
@ -700,27 +703,31 @@ suite "Waku rln relay":
|
||||
let wakuRlnRelay = (await WakuRlnRelay.new(wakuRlnConfig)).valueOr:
|
||||
raiseAssert $error
|
||||
|
||||
# get the current epoch time
|
||||
let time = epochTime()
|
||||
let time_1 = epochTime()
|
||||
|
||||
# create some messages from the same peer and append rln proof to them, except wm4
|
||||
var
|
||||
wm1 = WakuMessage(payload: "Valid message".toBytes())
|
||||
# create some messages from the same peer and append rln proof to them, except wm4
|
||||
wm1 = WakuMessage(payload: "Valid message".toBytes(), timestamp: now())
|
||||
# another message in the same epoch as wm1, it will break the messaging rate limit
|
||||
wm2 = WakuMessage(payload: "Spam".toBytes())
|
||||
# wm3 points to the next epoch
|
||||
wm3 = WakuMessage(payload: "Valid message".toBytes())
|
||||
wm4 = WakuMessage(payload: "Invalid message".toBytes())
|
||||
wm2 = WakuMessage(payload: "Spam message".toBytes(), timestamp: now())
|
||||
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm1, time).isOkOr:
|
||||
await sleepAsync(1.seconds)
|
||||
let time_2 = epochTime()
|
||||
|
||||
var
|
||||
# wm3 points to the next epoch bcz of the sleep
|
||||
wm3 = WakuMessage(payload: "Valid message".toBytes(), timestamp: now())
|
||||
wm4 = WakuMessage(payload: "Invalid message".toBytes(), timestamp: now())
|
||||
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm1, time_1).isOkOr:
|
||||
raiseAssert $error
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm2, time).isOkOr:
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm2, time_1).isOkOr:
|
||||
raiseAssert $error
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm3, time + float64(wakuRlnRelay.rlnEpochSizeSec)).isOkOr:
|
||||
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm3, time_2).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
# validate messages
|
||||
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
|
||||
let
|
||||
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1)
|
||||
# wm2 is published within the same Epoch as wm1 and should be found as spam
|
||||
@ -736,6 +743,48 @@ suite "Waku rln relay":
|
||||
msgValidate3 == MessageValidationResult.Valid
|
||||
msgValidate4 == MessageValidationResult.Invalid
|
||||
|
||||
asyncTest "validateMessageAndUpdateLog: against timestamp gap":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let wakuRlnConfig = WakuRlnConfig(
|
||||
dynamic: false,
|
||||
credIndex: some(index),
|
||||
userMessageLimit: 10,
|
||||
epochSizeSec: 10,
|
||||
treePath: genTempPath("rln_tree", "waku_rln_relay_2"),
|
||||
)
|
||||
|
||||
let wakuRlnRelay = (await WakuRlnRelay.new(wakuRlnConfig)).valueOr:
|
||||
raiseAssert $error
|
||||
|
||||
# usually it's 20 seconds but we set it to 2 for testing purposes which make the test faster
|
||||
wakuRlnRelay.rlnMaxTimestampGap = 1
|
||||
|
||||
var time = epochTime()
|
||||
|
||||
var
|
||||
wm1 = WakuMessage(payload: "timestamp message".toBytes(), timestamp: now())
|
||||
wm2 = WakuMessage(payload: "timestamp message".toBytes(), timestamp: now())
|
||||
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm1, time).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
wakuRlnRelay.unsafeAppendRLNProof(wm2, time).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
# validate the first message because it's timestamp is the same as the generated timestamp
|
||||
let msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1)
|
||||
|
||||
# wait for 2 seconds to make the timestamp different from generated timestamp
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
# invalidate the second message because it's timestamp is different from the generated timestamp
|
||||
let msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2)
|
||||
|
||||
check:
|
||||
msgValidate1 == MessageValidationResult.Valid
|
||||
msgValidate2 == MessageValidationResult.Invalid
|
||||
|
||||
asyncTest "validateMessageAndUpdateLog: multiple senders with same external nullifier":
|
||||
let index1 = MembershipIndex(5)
|
||||
let index2 = MembershipIndex(6)
|
||||
@ -766,9 +815,11 @@ suite "Waku rln relay":
|
||||
|
||||
# create messages from different peers and append rln proofs to them
|
||||
var
|
||||
wm1 = WakuMessage(payload: "Valid message from sender 1".toBytes())
|
||||
wm1 =
|
||||
WakuMessage(payload: "Valid message from sender 1".toBytes(), timestamp: now())
|
||||
# another message in the same epoch as wm1, it will break the messaging rate limit
|
||||
wm2 = WakuMessage(payload: "Valid message from sender 2".toBytes())
|
||||
wm2 =
|
||||
WakuMessage(payload: "Valid message from sender 2".toBytes(), timestamp: now())
|
||||
|
||||
wakuRlnRelay1.appendRLNProof(wm1, time).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
@ -132,7 +132,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let payload = "Hello".toBytes()
|
||||
|
||||
# prepare the epoch
|
||||
var message = WakuMessage(payload: @payload, contentTopic: contentTopic)
|
||||
var message =
|
||||
WakuMessage(payload: @payload, contentTopic: contentTopic, timestamp: now())
|
||||
doAssert(node1.wakuRlnRelay.unsafeAppendRLNProof(message, epochTime()).isOk())
|
||||
|
||||
debug "Nodes participating in the test",
|
||||
@ -221,19 +222,25 @@ procSuite "WakuNode - RLN relay":
|
||||
var messages1: seq[WakuMessage] = @[]
|
||||
var messages2: seq[WakuMessage] = @[]
|
||||
|
||||
let epochTime = epochTime()
|
||||
var epochTime = epochTime()
|
||||
|
||||
for i in 0 ..< 3:
|
||||
var message = WakuMessage(
|
||||
payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0]
|
||||
payload: ("Payload_" & $i).toBytes(),
|
||||
timestamp: now(),
|
||||
contentTopic: contentTopics[0],
|
||||
)
|
||||
nodes[0].wakuRlnRelay.unsafeAppendRLNProof(message, epochTime).isOkOr:
|
||||
raiseAssert $error
|
||||
messages1.add(message)
|
||||
|
||||
epochTime = epochTime()
|
||||
|
||||
for i in 0 ..< 3:
|
||||
var message = WakuMessage(
|
||||
payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1]
|
||||
payload: ("Payload_" & $i).toBytes(),
|
||||
timestamp: now(),
|
||||
contentTopic: contentTopics[1],
|
||||
)
|
||||
nodes[1].wakuRlnRelay.unsafeAppendRLNProof(message, epochTime).isOkOr:
|
||||
raiseAssert $error
|
||||
@ -364,8 +371,12 @@ procSuite "WakuNode - RLN relay":
|
||||
# check the proof is generated correctly outside when block to avoid duplication
|
||||
let rateLimitProof = rateLimitProofRes.get().encode().buffer
|
||||
|
||||
let message =
|
||||
WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof)
|
||||
let message = WakuMessage(
|
||||
payload: @payload,
|
||||
contentTopic: contentTopic,
|
||||
proof: rateLimitProof,
|
||||
timestamp: now(),
|
||||
)
|
||||
|
||||
## node1 publishes a message with an invalid rln proof, the message is then relayed to node2 which in turn
|
||||
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
|
||||
@ -452,24 +463,36 @@ procSuite "WakuNode - RLN relay":
|
||||
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# get the current epoch time
|
||||
let time = epochTime()
|
||||
let time_1 = epochTime()
|
||||
|
||||
# create some messages with rate limit proofs
|
||||
var
|
||||
wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
|
||||
wm1 = WakuMessage(
|
||||
payload: "message 1".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
# another message in the same epoch as wm1, it will break the messaging rate limit
|
||||
wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
|
||||
wm2 = WakuMessage(
|
||||
payload: "message 2".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
# wm3 points to the next epoch
|
||||
wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic)
|
||||
wm4 = WakuMessage(payload: "message 4".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(wm1, time).isOkOr:
|
||||
await sleepAsync(1000.millis)
|
||||
let time_2 = epochTime()
|
||||
|
||||
var
|
||||
wm3 = WakuMessage(
|
||||
payload: "message 3".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
wm4 = WakuMessage(
|
||||
payload: "message 4".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(wm1, time_1).isOkOr:
|
||||
raiseAssert $error
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(wm2, time).isOkOr:
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(wm2, time_1).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(
|
||||
wm3, time + float64(node3.wakuRlnRelay.rlnEpochSizeSec)
|
||||
).isOkOr:
|
||||
node3.wakuRlnRelay.unsafeAppendRLNProof(wm3, time_2).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
# relay handler for node3
|
||||
@ -700,8 +723,12 @@ procSuite "WakuNode - RLN relay":
|
||||
# Given some messages with rln proofs
|
||||
let time = epochTime()
|
||||
var
|
||||
msg1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
|
||||
msg2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
|
||||
msg1 = WakuMessage(
|
||||
payload: "message 1".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
msg2 = WakuMessage(
|
||||
payload: "message 2".toBytes(), timestamp: now(), contentTopic: contentTopic
|
||||
)
|
||||
|
||||
node1.wakuRlnRelay.unsafeAppendRLNProof(msg1, time).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
@ -260,7 +260,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
RelayWakuMessage(
|
||||
payload: base64.encode("TEST-PAYLOAD"),
|
||||
contentTopic: some(DefaultContentTopic),
|
||||
timestamp: some(int64(2022)),
|
||||
timestamp: some(now()),
|
||||
),
|
||||
)
|
||||
|
||||
@ -488,7 +488,7 @@ suite "Waku v2 Rest API - Relay":
|
||||
RelayWakuMessage(
|
||||
payload: base64.encode("TEST-PAYLOAD"),
|
||||
contentTopic: some(DefaultContentTopic),
|
||||
timestamp: some(int64(2022)),
|
||||
timestamp: some(now()),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@ -93,6 +93,7 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
nullifierLog*: OrderedTable[Epoch, Table[Nullifier, ProofMetadata]]
|
||||
lastEpoch*: Epoch # the epoch of the last published rln message
|
||||
rlnEpochSizeSec*: uint64
|
||||
rlnMaxTimestampGap*: uint64
|
||||
rlnMaxEpochGap*: uint64
|
||||
groupManager*: GroupManager
|
||||
onFatalErrorAction*: OnFatalErrorHandler
|
||||
@ -103,6 +104,7 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
|
||||
## and returns its corresponding rln `Epoch` value
|
||||
|
||||
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
|
||||
return toEpoch(e)
|
||||
|
||||
@ -211,25 +213,26 @@ proc validateMessage*(
|
||||
# track message count for metrics
|
||||
waku_rln_messages_total.inc()
|
||||
|
||||
# checks if the `msg`'s epoch is far from the current epoch
|
||||
# it corresponds to the validation of rln external nullifier
|
||||
# get current rln epoch
|
||||
let epoch: Epoch = rlnPeer.getCurrentEpoch()
|
||||
# checks if the message's timestamp is within acceptable range
|
||||
let currentTime = getTime().toUnixFloat()
|
||||
let messageTime = msg.timestamp.float64 / 1e9
|
||||
|
||||
let
|
||||
msgEpoch = proof.epoch
|
||||
# calculate the gaps
|
||||
gap = absDiff(epoch, msgEpoch)
|
||||
let timeDiff = uint64(abs(currentTime - messageTime))
|
||||
|
||||
trace "epoch info", currentEpoch = fromEpoch(epoch), msgEpoch = fromEpoch(msgEpoch)
|
||||
debug "time info",
|
||||
currentTime = currentTime, messageTime = messageTime, msgHash = msg.hash
|
||||
|
||||
# validate the epoch
|
||||
if gap > rlnPeer.rlnMaxEpochGap:
|
||||
# message's epoch is too old or too ahead
|
||||
# accept messages whose epoch is within +-MaxEpochGap from the current epoch
|
||||
warn "invalid message: epoch gap exceeds a threshold",
|
||||
gap = gap, payloadLen = msg.payload.len, msgEpoch = fromEpoch(proof.epoch)
|
||||
waku_rln_invalid_messages_total.inc(labelValues = ["invalid_epoch"])
|
||||
if timeDiff > rlnPeer.rlnMaxTimestampGap:
|
||||
warn "invalid message: timestamp difference exceeds threshold",
|
||||
timeDiff = timeDiff, maxTimestampGap = rlnPeer.rlnMaxTimestampGap
|
||||
waku_rln_invalid_messages_total.inc(labelValues = ["invalid_timestamp"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
let computedEpoch = rlnPeer.calcEpoch(messageTime)
|
||||
if proof.epoch != computedEpoch:
|
||||
warn "invalid message: timestamp mismatches epoch",
|
||||
proofEpoch = fromEpoch(proof.epoch), computedEpoch = fromEpoch(computedEpoch)
|
||||
waku_rln_invalid_messages_total.inc(labelValues = ["timestamp_mismatch"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot)
|
||||
@ -242,8 +245,9 @@ proc validateMessage*(
|
||||
|
||||
# verify the proof
|
||||
let
|
||||
contentTopicBytes = msg.contentTopic.toBytes
|
||||
input = concat(msg.payload, contentTopicBytes)
|
||||
contentTopicBytes = toBytes(msg.contentTopic)
|
||||
timestampBytes = toBytes(msg.timestamp.uint64)
|
||||
input = concat(msg.payload, contentTopicBytes, @(timestampBytes))
|
||||
|
||||
waku_rln_proof_verification_total.inc()
|
||||
waku_rln_proof_verification_duration_seconds.nanosecondTime:
|
||||
@ -265,6 +269,8 @@ proc validateMessage*(
|
||||
if proofMetadataRes.isErr():
|
||||
waku_rln_errors_total.inc(labelValues = ["proof_metadata_extraction"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
let msgEpoch = proof.epoch
|
||||
let hasDup = rlnPeer.hasDuplicate(msgEpoch, proofMetadataRes.get())
|
||||
if hasDup.isErr():
|
||||
waku_rln_errors_total.inc(labelValues = ["duplicate_check"])
|
||||
@ -305,10 +311,12 @@ proc validateMessageAndUpdateLog*(
|
||||
|
||||
proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] =
|
||||
## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module
|
||||
## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence
|
||||
## it extracts the `contentTopic`, `timestamp` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence
|
||||
|
||||
let
|
||||
contentTopicBytes = wakumessage.contentTopic.toBytes()
|
||||
output = concat(wakumessage.payload, contentTopicBytes)
|
||||
contentTopicBytes = toBytes(wakumessage.contentTopic)
|
||||
timestampBytes = toBytes(wakumessage.timestamp.uint64)
|
||||
output = concat(wakumessage.payload, contentTopicBytes, @(timestampBytes))
|
||||
return output
|
||||
|
||||
proc appendRLNProof*(
|
||||
@ -479,6 +487,7 @@ proc mount(
|
||||
nonceManager: NonceManager.init(conf.userMessageLimit, conf.epochSizeSec.float),
|
||||
rlnEpochSizeSec: conf.epochSizeSec,
|
||||
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.epochSizeSec)), 1),
|
||||
rlnMaxTimestampGap: uint64(MaxClockGapSeconds),
|
||||
onFatalErrorAction: conf.onFatalErrorAction,
|
||||
)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user