mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-26 23:02:30 +00:00
feat: add WakuMessage validation in gossipsub (#1537)
This commit is contained in:
parent
7cbb0bb28f
commit
55bac8dedf
@ -52,6 +52,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
node.peerManager.start()
|
||||
if not await node.startDiscv5():
|
||||
error "failed to start discv5"
|
||||
quit(1)
|
||||
|
@ -48,6 +48,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
node.peerManager.start()
|
||||
if not await node.startDiscv5():
|
||||
error "failed to start discv5"
|
||||
quit(1)
|
||||
|
@ -1,12 +1,12 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/os,
|
||||
std/[os,sysrand,sequtils,tables,math],
|
||||
stew/byteutils,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/peerid,
|
||||
@ -19,8 +19,11 @@ import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/node/peer_manager,
|
||||
../../waku/v2/utils/peers,
|
||||
../../waku/v2/node/waku_node
|
||||
|
||||
../../waku/v2/node/waku_node,
|
||||
../../waku/v2/protocol/waku_relay,
|
||||
../test_helpers,
|
||||
./testlib/common
|
||||
#./testlib/testutils
|
||||
|
||||
template sourceDir: string = currentSourcePath.parentDir()
|
||||
const KEY_PATH = sourceDir / "resources/test_key.pem"
|
||||
@ -28,7 +31,7 @@ const CERT_PATH = sourceDir / "resources/test_cert.pem"
|
||||
|
||||
procSuite "WakuNode - Relay":
|
||||
let rng = crypto.newRng()
|
||||
|
||||
|
||||
asyncTest "Relay protocol is started correctly":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -61,7 +64,7 @@ procSuite "WakuNode - Relay":
|
||||
GossipSub(node2.wakuRelay).heartbeatFut.isNil == false
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
|
||||
asyncTest "Messages are correctly relayed":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -109,7 +112,7 @@ procSuite "WakuNode - Relay":
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
||||
|
||||
asyncTest "filtering relayed messages using topic validators":
|
||||
## test scenario:
|
||||
## node1 and node3 set node2 as their relay node
|
||||
@ -204,7 +207,39 @@ procSuite "WakuNode - Relay":
|
||||
(await completionFutValidatorRej.withTimeout(10.seconds)) == true
|
||||
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
||||
|
||||
|
||||
asyncTest "Stats of peer sending wrong WakuMessages are updated":
|
||||
# Create 2 nodes
|
||||
let nodes = toSeq(0..1).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||
|
||||
# Start all the nodes and mount relay with
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
|
||||
# Connect nodes
|
||||
let conn = await nodes[0].peerManager.dialPeer(nodes[1].switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
||||
require conn.isSome
|
||||
|
||||
# Node 1 subscribes to topic
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
|
||||
nodes[1].subscribe(DefaultPubsubTopic, handler)
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
|
||||
for i in 0..4:
|
||||
discard await nodes[0].wakuRelay.publish(DefaultPubsubTopic, urandom(1*(10^2)))
|
||||
|
||||
# Wait for gossip
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Verify that node 1 has received 5 invalid messages from node 0
|
||||
# meaning that message validity is enforced to gossip messages
|
||||
var peerStats = nodes[1].wakuRelay.peerStats
|
||||
check:
|
||||
peerStats[nodes[0].switch.peerInfo.peerId].topicInfos[DefaultPubsubTopic].invalidMessageDeliveries == 5.0
|
||||
|
||||
await allFutures(nodes.mapIt(it.stop()))
|
||||
|
||||
asyncTest "Messages are relayed between two websocket nodes":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -380,7 +415,7 @@ procSuite "WakuNode - Relay":
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
|
||||
await allFutures(node1.stop(), node2.stop())
|
||||
|
||||
asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)":
|
||||
@ -389,7 +424,7 @@ procSuite "WakuNode - Relay":
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8100), wsEnabled = true )
|
||||
|
||||
|
||||
let
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -97,6 +97,17 @@ proc new*(T: type WakuRelay,
|
||||
verifySignature = false,
|
||||
maxMessageSize = MaxWakuMessageSize
|
||||
)
|
||||
|
||||
# Rejects messages that are not WakuMessage
|
||||
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
|
||||
let msg = WakuMessage.decode(message.data)
|
||||
if msg.isOk():
|
||||
return ValidationResult.Accept
|
||||
return ValidationResult.Reject
|
||||
|
||||
# Add validator to all default pubsub topics
|
||||
for pubSubTopic in defaultPubsubTopics:
|
||||
wr.addValidator(pubSubTopic, validator)
|
||||
except InitializationError:
|
||||
return err("initialization error: " & getCurrentExceptionMsg())
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user