chore: deprecating protected topics in favor of protected shards (#2983)

This commit is contained in:
gabrielmer 2024-08-19 12:56:22 +02:00 committed by GitHub
parent 67439057fb
commit e51ffe0759
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 81 deletions

View File

@ -31,14 +31,14 @@ suite "WakuNode2 - Validators":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
)
# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
# Protected shard and key to sign
let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7)
let secretKey = SkSecretKey
.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6")
.expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable
let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable
# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))
@ -48,10 +48,12 @@ suite "WakuNode2 - Validators":
# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics: seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)
var signedShards: seq[ProtectedShard]
for shard, publicKey in shardsPublicKeys:
signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey))
node.wakuRelay.addSignedShardsValidator(
signedShards, spamProtectedShard.clusterId
)
# Connect the nodes in a full mesh
for i in 0 ..< 5:
@ -72,7 +74,7 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
await sleepAsync(500.millis)
# Each node publishes 10 signed messages
@ -80,7 +82,7 @@ suite "WakuNode2 - Validators":
for j in 0 ..< 10:
var msg = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: now(),
ephemeral: true,
@ -88,9 +90,9 @@ suite "WakuNode2 - Validators":
# Include signature
msg.meta =
secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0 .. 63]
secretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[0 .. 63]
discard await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some($spamProtectedShard), msg)
# Wait for gossip
await sleepAsync(2.seconds)
@ -103,7 +105,7 @@ suite "WakuNode2 - Validators":
for i in 0 ..< 5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0
v.topicInfos[spamProtectedShard].invalidMessageDeliveries == 0.0
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))
@ -114,14 +116,14 @@ suite "WakuNode2 - Validators":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
)
# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
# Protected shard and key to sign
let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7)
let secretKey = SkSecretKey
.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6")
.expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable
let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable
# Non whitelisted secret key
let wrongSecretKey = SkSecretKey
@ -136,10 +138,12 @@ suite "WakuNode2 - Validators":
# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics: seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)
var signedShards: seq[ProtectedShard]
for shard, publicKey in shardsPublicKeys:
signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey))
node.wakuRelay.addSignedShardsValidator(
signedShards, spamProtectedShard.clusterId
)
# Connect the nodes in a full mesh
for i in 0 ..< 5:
@ -160,7 +164,7 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
await sleepAsync(500.millis)
# Each node sends 5 messages, signed but with a non-whitelisted key (total = 25)
@ -168,42 +172,42 @@ suite "WakuNode2 - Validators":
for j in 0 ..< 5:
var msg = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: now(),
ephemeral: true,
)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[
0 .. 63
]
discard await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some($spamProtectedShard), msg)
# Each node sends 5 messages that are not signed (total = 25)
for i in 0 ..< 5:
for j in 0 ..< 5:
let unsignedMessage = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: now(),
ephemeral: true,
)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage)
# Each node sends 5 messages that dont contain timestamp (total = 25)
for i in 0 ..< 5:
for j in 0 ..< 5:
let unsignedMessage = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: 0,
ephemeral: true,
)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage)
# Each node sends 5 messages way BEFORE than the current timestmap (total = 25)
for i in 0 ..< 5:
@ -211,12 +215,12 @@ suite "WakuNode2 - Validators":
let beforeTimestamp = now() - getNanosecondTime(6 * 60)
let unsignedMessage = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: beforeTimestamp,
ephemeral: true,
)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage)
# Each node sends 5 messages way LATER than the current timestmap (total = 25)
for i in 0 ..< 5:
@ -224,12 +228,12 @@ suite "WakuNode2 - Validators":
let afterTimestamp = now() - getNanosecondTime(6 * 60)
let unsignedMessage = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: afterTimestamp,
ephemeral: true,
)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage)
# Since we have a full mesh with 5 nodes and each one publishes 25+25+25+25+25 msgs
# there are 625 messages being sent.
@ -243,7 +247,7 @@ suite "WakuNode2 - Validators":
msgRejected = 0
for i in 0 ..< 5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int
msgRejected += v.topicInfos[spamProtectedShard].invalidMessageDeliveries.int
if msgReceived == 125 and msgRejected == 500:
break
@ -262,14 +266,14 @@ suite "WakuNode2 - Validators":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
)
# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
# Protected shard and key to sign
let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7)
let secretKey = SkSecretKey
.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6")
.expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable
let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable
# Non whitelisted secret key
let wrongSecretKey = SkSecretKey
@ -288,15 +292,17 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe(spamProtectedTopic, handler)
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
await sleepAsync(500.millis)
# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics: seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)
var signedShards: seq[ProtectedShard]
for shard, publicKey in shardsPublicKeys:
signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey))
node.wakuRelay.addSignedShardsValidator(
signedShards, spamProtectedShard.clusterId
)
# nodes[0] is connected only to nodes[1]
let connOk1 = await nodes[0].peerManager.connectRelay(
@ -321,26 +327,26 @@ suite "WakuNode2 - Validators":
for j in 0 ..< 50:
let unsignedMessage = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: now(),
ephemeral: true,
)
discard await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[0].publish(some($spamProtectedShard), unsignedMessage)
# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0 ..< 50:
var msg = WakuMessage(
payload: urandom(1 * (10 ^ 3)),
contentTopic: spamProtectedTopic,
contentTopic: spamProtectedShard,
version: 2,
timestamp: now(),
ephemeral: true,
)
# Sign the message with a wrong key
msg.meta =
wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0 .. 63]
discard await nodes[0].publish(some(spamProtectedTopic), msg)
wrongSecretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[0 .. 63]
discard await nodes[0].publish(some($spamProtectedShard), msg)
# Wait for gossip
await sleepAsync(2.seconds)
@ -353,7 +359,7 @@ suite "WakuNode2 - Validators":
# peer1 got invalid messages from peer0
let p0Id = nodes[0].peerInfo.peerId
check:
nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedTopic].invalidMessageDeliveries ==
nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedShard].invalidMessageDeliveries ==
100.0
# peer1 did not gossip further, so no other node rx invalid messages
@ -362,7 +368,7 @@ suite "WakuNode2 - Validators":
if k == p0Id and i == 1:
continue
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0
v.topicInfos[spamProtectedShard].invalidMessageDeliveries == 0.0
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

View File

@ -19,19 +19,21 @@ import
../common/confutils/envvar/std/net as confEnvvarNet,
../common/logging,
../waku_enr,
../node/peer_manager
../node/peer_manager,
../waku_core/topics/pubsub_topic
include ../waku_core/message/default_values
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
type ConfResult*[T] = Result[T, string]
type ProtectedTopic* = object
topic*: string
key*: secp256k1.SkPublicKey
type EthRpcUrl* = distinct string
type ProtectedShard* = object
shard*: uint16
key*: secp256k1.SkPublicKey
type StartUpCommand* = enum
noCommand # default, runs waku
generateRlnKeystore # generates a new RLN keystore
@ -134,10 +136,17 @@ type WakuNodeConf* = object
## Application-level configuration
protectedTopics* {.
desc:
"Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
defaultValue: newSeq[ProtectedTopic](0),
"Deprecated. Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
defaultValue: newSeq[ProtectedShard](0),
name: "protected-topic"
.}: seq[ProtectedTopic]
.}: seq[ProtectedShard]
protectedShards* {.
desc:
"Shards and its public keys to be used for message validation, shard:pubkey. Argument may be repeated.",
defaultValue: newSeq[ProtectedShard](0),
name: "protected-shard"
.}: seq[ProtectedShard]
## General node config
clusterId* {.
@ -694,20 +703,36 @@ proc parseCmdArg*[T](_: type seq[T], s: string): seq[T] {.raises: [ValueError].}
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type ProtectedTopic, p: string): T =
# TODO: Remove when removing protected-topic configuration
proc isNumber(x: string): bool =
try:
discard parseInt(x)
result = true
except ValueError:
result = false
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
let elements = p.split(":")
if elements.len != 2:
raise newException(
ValueError, "Invalid format for protected topic expected topic:publickey"
ValueError, "Invalid format for protected shard expected shard:publickey"
)
let publicKey = secp256k1.SkPublicKey.fromHex(elements[1])
if publicKey.isErr:
raise newException(ValueError, "Invalid public key")
return ProtectedTopic(topic: elements[0], key: publicKey.get())
if isNumber(elements[0]):
return ProtectedShard(shard: uint16.parseCmdArg(elements[0]), key: publicKey.get())
proc completeCmdArg*(T: type ProtectedTopic, val: string): seq[string] =
# TODO: Remove when removing protected-topic configuration
let shard = RelayShard.parse(elements[0]).valueOr:
raise newException(
ValueError,
"Invalid pubsub topic. Pubsub topics must be in the format /waku/2/rs/<cluster-id>/<shard-id>",
)
return ProtectedShard(shard: shard.shardId, key: publicKey.get())
proc completeCmdArg*(T: type ProtectedShard, val: string): seq[string] =
return @[]
proc completeCmdArg*(T: type IpAddress, val: string): seq[string] =
@ -769,18 +794,18 @@ proc readValue*(
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var ProtectedTopic
r: var TomlReader, value: var ProtectedShard
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(ProtectedTopic, r.readValue(string))
value = parseCmdArg(ProtectedShard, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var EnvvarReader, value: var ProtectedTopic
r: var EnvvarReader, value: var ProtectedShard
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(ProtectedTopic, r.readValue(string))
value = parseCmdArg(ProtectedShard, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())

View File

@ -175,16 +175,16 @@ proc setupProtocols(
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
# Add validation keys to protected topics
var subscribedProtectedTopics: seq[ProtectedTopic]
for topicKey in conf.protectedTopics:
if topicKey.topic notin pubsubTopics:
warn "protected topic not in subscribed pubsub topics, skipping adding validator",
protectedTopic = topicKey.topic, subscribedTopics = pubsubTopics
var subscribedProtectedShards: seq[ProtectedShard]
for shardKey in conf.protectedShards:
if shardKey.shard notin conf.shards:
warn "protected shard not in subscribed shards, skipping adding validator",
protectedShard = shardKey.shard, subscribedShards = shards
continue
subscribedProtectedTopics.add(topicKey)
subscribedProtectedShards.add(shardKey)
notice "routing only signed traffic",
protectedTopic = topicKey.topic, publicKey = topicKey.key
node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics)
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
# Enable Rendezvous Discovery protocol when Relay is enabled
try:

View File

@ -50,30 +50,34 @@ proc withinTimeWindow*(msg: WakuMessage): bool =
return true
return false
proc addSignedTopicsValidator*(w: WakuRelay, protectedTopics: seq[ProtectedTopic]) =
debug "adding validator to signed topics"
proc addSignedShardsValidator*(
w: WakuRelay, protectedShards: seq[ProtectedShard], clusterId: uint16
) =
debug "adding validator to signed shards", protectedShards, clusterId
proc validator(
topic: string, msg: WakuMessage
): Future[errors.ValidationResult] {.async.} =
var outcome = errors.ValidationResult.Reject
for protectedTopic in protectedTopics:
if (protectedTopic.topic == topic):
for protectedShard in protectedShards:
let topicString =
$RelayShard(clusterId: clusterId, shardId: uint16(protectedShard.shard))
if (topicString == topic):
if msg.timestamp != 0:
if msg.withinTimeWindow():
let msgHash = SkMessage(topic.msgHash(msg))
let recoveredSignature = SkSignature.fromRaw(msg.meta)
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, protectedTopic.key):
if recoveredSignature.get.verify(msgHash, protectedShard.key):
outcome = errors.ValidationResult.Accept
if outcome != errors.ValidationResult.Accept:
debug "signed topic validation failed",
topic = topic, publicTopicKey = protectedTopic.key
topic = topic, publicShardKey = protectedShard.key
waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
return outcome
return errors.ValidationResult.Accept
w.addValidator(validator, "signed topic validation failed")
w.addValidator(validator, "signed shard validation failed")