refactor(relay): improve waku_relay apis and add tests

This commit is contained in:
Lorenzo Delgado 2023-02-10 15:17:50 +01:00 committed by GitHub
parent b6c2c71aa6
commit f07e6301c8
11 changed files with 331 additions and 94 deletions

View File

@ -28,10 +28,15 @@ when defined(waku_exp_store_resume):
import ./v2/waku_store/test_resume
# Waku relay test suite
import
./v2/waku_relay/test_waku_relay,
./v2/waku_relay/test_wakunode_relay
import
# Waku v2 tests
./v2/test_wakunode,
./v2/test_wakunode_relay,
# Waku LightPush
./v2/test_waku_lightpush,
./v2/test_wakunode_lightpush,

View File

@ -6,13 +6,13 @@ import
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
libp2p/crypto/crypto
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/[server, client, base64, utils],
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache],
../../waku/v2/protocol/waku_relay,
../../waku/v2/utils/time,
./testlib/common
@ -68,7 +68,7 @@ suite "REST API - Relay":
check:
# Node should be subscribed to default + new topics
PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len
toSeq(node.wakuRelay.subscribedTopics).len == 1 + pubSubTopics.len
await restServer.stop()
await restServer.closeWait()
@ -192,7 +192,8 @@ suite "REST API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# At this stage the node is only subscribed to the default topic
require(PubSub(node.wakuRelay).topics.len == 1)
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
# When

View File

@ -5,6 +5,9 @@ import
import
../../test_helpers
export switch
proc newTestSwitch*(key=none(PrivateKey), address=none(MultiAddress)): Switch =
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())

View File

@ -0,0 +1,232 @@
{.used.}
import
std/[options, sequtils, strutils],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/messages
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_relay,
../testlib/switch,
../testlib/common
proc noopRawHandler(): PubsubRawHandler =
var handler: PubsubRawHandler
handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard
handler
proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch, triggerSelf = self).tryGet()
await proto.start()
let protocolMatcher = proc(proto: string): bool {.gcsafe.} =
return proto.startsWith(WakuRelayCodec)
switch.mount(proto, protocolMatcher)
return proto
suite "Waku Relay":
asyncTest "subscribe and add handler to topics":
## Setup
let nodeA = await newTestWakuRelay()
## Given
let
networkA = "test-network1"
networkB = "test-network2"
## when
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
## Then
check:
nodeA.isSubscribed(networkA)
nodeA.isSubscribed(networkB)
let subscribedTopics = toSeq(nodeA.subscribedTopics)
check:
subscribedTopics.len == 2
subscribedTopics.contains(networkA)
subscribedTopics.contains(networkB)
## Cleanup
await nodeA.stop()
asyncTest "unsubscribe all handlers from topic":
## Setup
let nodeA = await newTestWakuRelay()
## Given
let
networkA = "test-network1"
networkB = "test-network2"
networkC = "test-network3"
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler())
let topics = toSeq(nodeA.subscribedTopics)
require:
topics.len == 3
topics.contains(networkA)
topics.contains(networkB)
topics.contains(networkC)
## When
nodeA.unsubscribeAll(networkA)
## Then
check:
nodeA.isSubscribed(networkB)
nodeA.isSubscribed(networkC)
not nodeA.isSubscribed(networkA)
let subscribedTopics = toSeq(nodeA.subscribedTopics)
check:
subscribedTopics.len == 2
subscribedTopics.contains(networkB)
subscribedTopics.contains(networkC)
not subscribedTopics.contains(networkA)
## Cleanup
await nodeA.stop()
asyncTest "publish a message into a topic":
## Setup
let
srcSwitch = newTestSwitch()
srcPeerManager = PeerManager.new(srcSwitch)
srcNode = await newTestWakuRelay(srcSwitch)
dstSwitch = newTestSwitch()
dstPeerManager = PeerManager.new(dstSwitch)
dstNode = await newTestWakuRelay(dstSwitch)
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
require:
conn.isSome()
## Given
let networkTopic = "test-network1"
let message = fakeWakuMessage()
# Self subscription (triggerSelf = true)
let srcSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message))
srcNode.subscribe(networkTopic, srcSubsHandler)
# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)
## When
discard await srcNode.publish(networkTopic, message)
## Then
require:
await srcSubsFut.withTimeout(5.seconds)
await dstSubsFut.withTimeout(5.seconds)
let (srcTopic, srcMsg) = srcSubsFut.read()
check:
srcTopic == networkTopic
srcMsg == message
let (dstTopic, dstMsg) = dstSubsFut.read()
check:
dstTopic == networkTopic
dstMsg == message
## Cleanup
await allFutures(srcSwitch.stop(), dstSwitch.stop())
asyncTest "content topic validator as a message subscription filter":
## Setup
let
srcSwitch = newTestSwitch()
srcPeerManager = PeerManager.new(srcSwitch)
srcNode = await newTestWakuRelay(srcSwitch)
dstSwitch = newTestSwitch()
dstPeerManager = PeerManager.new(dstSwitch)
dstNode = await newTestWakuRelay(dstSwitch)
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
require:
conn.isSome()
## Given
let networkTopic = "test-network1"
let contentTopic = "test-content1"
let message = fakeWakuMessage(contentTopic=contentTopic)
let messages = @[
fakeWakuMessage(contentTopic="any"),
fakeWakuMessage(contentTopic="any"),
fakeWakuMessage(contentTopic="any"),
message,
fakeWakuMessage(contentTopic="any"),
]
# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)
# Validator
proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} =
let msg = WakuMessage.decode(msg.data)
if msg.isErr():
return ValidationResult.Ignore
# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic:
return ValidationResult.Reject
return ValidationResult.Accept
dstNode.addValidator(networkTopic, validator)
## When
for msg in messages:
discard await srcNode.publish(networkTopic, msg)
## Then
require:
await dstSubsFut.withTimeout(5.seconds)
let (dstTopic, dstMsg) = dstSubsFut.read()
check:
dstTopic == networkTopic
dstMsg == message
## Cleanup
await allFutures(srcSwitch.stop(), dstSwitch.stop())

View File

@ -21,9 +21,9 @@ import
../../waku/v2/utils/peers,
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_relay,
../test_helpers,
./testlib/common
#./testlib/testutils
../../test_helpers,
../testlib/common,
../testlib/testutils
template sourceDir: string = currentSourcePath.parentDir()
const KEY_PATH = sourceDir / "resources/test_key.pem"
@ -43,7 +43,7 @@ procSuite "WakuNode - Relay":
await node1.mountRelay()
check:
GossipSub(node1.wakuRelay).heartbeatFut.isNil == false
GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false
# Relay protocol starts if mounted before node start
@ -55,13 +55,13 @@ procSuite "WakuNode - Relay":
check:
# Relay has not yet started as node has not yet started
GossipSub(node2.wakuRelay).heartbeatFut.isNil
GossipSub(node2.wakuRelay).heartbeatFut.isNil()
await node2.start()
check:
# Relay started on node start
GossipSub(node2.wakuRelay).heartbeatFut.isNil == false
GossipSub(node2.wakuRelay).heartbeatFut.isNil() == false
await allFutures([node1.stop(), node2.stop()])
@ -87,8 +87,10 @@ procSuite "WakuNode - Relay":
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await allFutures(
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -105,13 +107,13 @@ procSuite "WakuNode - Relay":
await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message)
await sleepAsync(500.millis)
## Then
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
await node3.stop()
## Cleanup
await allFutures(node1.stop(), node2.stop(), node3.stop())
asyncTest "filtering relayed messages using topic validators":
## test scenario:
@ -157,23 +159,26 @@ procSuite "WakuNode - Relay":
var completionFutValidatorAcc = newFuture[bool]()
var completionFutValidatorRej = newFuture[bool]()
# set a topic validator for pubSubTopic
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
## the validator that only allows messages with contentTopic1 to be relayed
check:
topic == pubSubTopic
let msg = WakuMessage.decode(message.data)
if msg.isOk():
# only relay messages with contentTopic1
if msg.value().contentTopic == contentTopic1:
result = ValidationResult.Accept
completionFutValidatorAcc.complete(true)
else:
result = ValidationResult.Reject
completionFutValidatorRej.complete(true)
# set a topic validator for pubSubTopic
let pb = PubSub(node2.wakuRelay)
pb.addValidator(pubSubTopic, validator)
let msg = WakuMessage.decode(message.data)
if msg.isErr():
completionFutValidatorAcc.complete(false)
return ValidationResult.Reject
# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic1:
completionFutValidatorRej.complete(true)
return ValidationResult.Reject
completionFutValidatorAcc.complete(true)
return ValidationResult.Accept
node2.wakuRelay.addValidator(pubSubTopic, validator)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -208,7 +213,8 @@ procSuite "WakuNode - Relay":
await allFutures(node1.stop(), node2.stop(), node3.stop())
asyncTest "Stats of peer sending wrong WakuMessages are updated":
# TODO: Add a function to validate the WakuMessage integrity
xasyncTest "Stats of peer sending wrong WakuMessages are updated":
# Create 2 nodes
let nodes = toSeq(0..1).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))

View File

@ -59,10 +59,6 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic)
{.raises: [IOError, Defect].} =
writer.writeValue(string(topic))
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} =
writer.beginRecord()
@ -79,14 +75,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError, Defect].} =
value = Base64String(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic)
{.raises: [SerializationError, IOError, Defect].} =
pubsubTopic = PubSubTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic)
{.raises: [SerializationError, IOError, Defect].} =
contentTopic = ContentTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} =
var

View File

@ -503,10 +503,6 @@ proc startRelay*(node: WakuNode) {.async.} =
## Setup relay protocol
# Subscribe to the default PubSub topics
for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic)
# Resume previous relay connections
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
@ -532,7 +528,6 @@ proc mountRelay*(node: WakuNode,
let initRes = WakuRelay.new(
node.switch,
defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics),
triggerSelf = triggerSelf
)
if initRes.isErr():
@ -553,6 +548,13 @@ proc mountRelay*(node: WakuNode,
info "relay mounted successfully"
# TODO: As part of #1545, remove this and update the tests cases
node.subscribe(DefaultPubsubTopic)
# Subscribe to topics
for topic in topics:
node.subscribe(topic)
## Waku filter

View File

@ -8,7 +8,7 @@ else:
{.push raises: [].}
import
std/[sequtils, tables],
std/[tables, sequtils, hashes],
stew/results,
chronos,
chronicles,
@ -37,7 +37,6 @@ type
type
WakuRelay* = ref object of GossipSub
defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics
WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler
@ -78,15 +77,12 @@ method initPubSub(w: WakuRelay) {.raises: [InitializationError].} =
w.initProtocolHandler()
proc new*(T: type WakuRelay,
switch: Switch,
defaultPubsubTopics: seq[PubsubTopic] = @[],
triggerSelf: bool = true): WakuRelayResult[T] =
proc new*(T: type WakuRelay, switch: Switch, triggerSelf: bool = true): WakuRelayResult[T] =
proc msgIdProvider(msg: messages.Message): Result[MessageID, ValidationResult] =
let hash = MultiHash.digest("sha2-256", msg.data)
if hash.isErr():
ok(($msg.data.hash).toBytes())
ok(toBytes($hashes.hash(msg.data)))
else:
ok(hash.value.data.buffer)
@ -100,25 +96,28 @@ proc new*(T: type WakuRelay,
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
)
# Rejects messages that are not WakuMessage
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
let msg = WakuMessage.decode(message.data)
if msg.isOk():
return ValidationResult.Accept
return ValidationResult.Reject
# Add validator to all default pubsub topics
for pubSubTopic in defaultPubsubTopics:
wr.addValidator(pubSubTopic, validator)
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
wr.defaultPubsubTopics = defaultPubsubTopics
# TODO: Add a function to validate the WakuMessage integrity
# # Rejects messages that are not WakuMessage
# proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
# let msg = WakuMessage.decode(message.data)
# if msg.isOk():
# return ValidationResult.Accept
# return ValidationResult.Reject
# # Add validator to all default pubsub topics
# for pubSubTopic in defaultPubsubTopics:
# wr.addValidator(pubSubTopic, validator)
ok(wr)
method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} =
procCall GossipSub(w).addValidator(topic, handler)
method start*(w: WakuRelay) {.async.} =
debug "start"
await procCall GossipSub(w).start()

View File

@ -715,8 +715,7 @@ proc addRLNRelayValidator*(wakuRlnRelay: WakuRLNRelay,
handler(wakumessage)
return pubsub.ValidationResult.Reject
# set a validator for the supplied pubsubTopic
let pb = PubSub(wakuRelay)
pb.addValidator(pubsubTopic, validator)
wakuRelay.addValidator(pubsubTopic, validator)
proc mountRlnRelayStatic*(wakuRelay: WakuRelay,
group: seq[IDCommitment],
@ -1026,9 +1025,11 @@ proc new*(T: type WakuRlnRelay,
# relay protocol is the prerequisite of rln-relay
if wakuRelay.isNil():
return err("WakuRelay protocol is not mounted")
# check whether the pubsub topic is supported at the relay level
if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics:
return err("The relay protocol does not support the configured pubsub topic")
# TODO: Review this. The Waku Relay instance is no longer keeping track of the default pubsub topics
# # check whether the pubsub topic is supported at the relay level
# if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics:
# return err("The relay protocol does not support the configured pubsub topic")
debug "rln-relay input validation passed"
waku_rln_relay_mounting_duration_seconds.nanosecondTime: