refactor(relay): improve waku_relay apis and add tests

This commit is contained in:
Lorenzo Delgado 2023-02-10 15:17:50 +01:00 committed by GitHub
parent 2f390ce884
commit 274101af43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 331 additions and 94 deletions

View File

@ -28,10 +28,15 @@ when defined(waku_exp_store_resume):
import ./v2/waku_store/test_resume
# Waku relay test suite
import
./v2/waku_relay/test_waku_relay,
./v2/waku_relay/test_wakunode_relay
import
# Waku v2 tests
./v2/test_wakunode,
./v2/test_wakunode_relay,
# Waku LightPush
./v2/test_waku_lightpush,
./v2/test_wakunode_lightpush,

View File

@ -6,19 +6,19 @@ import
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
libp2p/crypto/crypto
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/[server, client, base64, utils],
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache],
../../waku/v2/protocol/waku_relay,
../../waku/v2/utils/time,
./testlib/common
proc testWakuNode(): WakuNode =
let
proc testWakuNode(): WakuNode =
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
@ -29,7 +29,7 @@ proc testWakuNode(): WakuNode =
suite "REST API - Relay":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
@ -68,13 +68,13 @@ suite "REST API - Relay":
check:
# Node should be subscribed to default + new topics
PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len
toSeq(node.wakuRelay.subscribedTopics).len == 1 + pubSubTopics.len
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions":
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
@ -94,7 +94,7 @@ suite "REST API - Relay":
restServer.start()
let pubSubTopics = @[
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3"),
PubSubTopic("pubsub-topic-y")
@ -122,7 +122,7 @@ suite "REST API - Relay":
await node.stop()
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
@ -157,7 +157,7 @@ suite "REST API - Relay":
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool:
response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == Base64String.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
@ -172,8 +172,8 @@ suite "REST API - Relay":
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
await node.start()
@ -190,10 +190,11 @@ suite "REST API - Relay":
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# At this stage the node is only subscribed to the default topic
require(PubSub(node.wakuRelay).topics.len == 1)
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
# When
let newTopics = @[
@ -202,10 +203,10 @@ suite "REST API - Relay":
PubSubTopic("pubsub-topic-3")
]
discard await client.relayPostSubscriptionsV1(newTopics)
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: Base64String.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic),
payload: Base64String.encode("TEST-PAYLOAD"),
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022))
))

View File

@ -5,7 +5,10 @@ import
import
../../test_helpers
export switch
proc newTestSwitch*(key=none(PrivateKey), address=none(MultiAddress)): Switch =
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
return newStandardSwitch(some(peerKey), addrs=peerAddr)
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
return newStandardSwitch(some(peerKey), addrs=peerAddr)

View File

@ -0,0 +1,232 @@
{.used.}
import
std/[options, sequtils, strutils],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/messages
import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_relay,
../testlib/switch,
../testlib/common
proc noopRawHandler(): PubsubRawHandler =
var handler: PubsubRawHandler
handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard
handler
proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch, triggerSelf = self).tryGet()
await proto.start()
let protocolMatcher = proc(proto: string): bool {.gcsafe.} =
return proto.startsWith(WakuRelayCodec)
switch.mount(proto, protocolMatcher)
return proto
suite "Waku Relay":
asyncTest "subscribe and add handler to topics":
## Setup
let nodeA = await newTestWakuRelay()
## Given
let
networkA = "test-network1"
networkB = "test-network2"
## when
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
## Then
check:
nodeA.isSubscribed(networkA)
nodeA.isSubscribed(networkB)
let subscribedTopics = toSeq(nodeA.subscribedTopics)
check:
subscribedTopics.len == 2
subscribedTopics.contains(networkA)
subscribedTopics.contains(networkB)
## Cleanup
await nodeA.stop()
asyncTest "unsubscribe all handlers from topic":
## Setup
let nodeA = await newTestWakuRelay()
## Given
let
networkA = "test-network1"
networkB = "test-network2"
networkC = "test-network3"
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler())
let topics = toSeq(nodeA.subscribedTopics)
require:
topics.len == 3
topics.contains(networkA)
topics.contains(networkB)
topics.contains(networkC)
## When
nodeA.unsubscribeAll(networkA)
## Then
check:
nodeA.isSubscribed(networkB)
nodeA.isSubscribed(networkC)
not nodeA.isSubscribed(networkA)
let subscribedTopics = toSeq(nodeA.subscribedTopics)
check:
subscribedTopics.len == 2
subscribedTopics.contains(networkB)
subscribedTopics.contains(networkC)
not subscribedTopics.contains(networkA)
## Cleanup
await nodeA.stop()
asyncTest "publish a message into a topic":
## Setup
let
srcSwitch = newTestSwitch()
srcPeerManager = PeerManager.new(srcSwitch)
srcNode = await newTestWakuRelay(srcSwitch)
dstSwitch = newTestSwitch()
dstPeerManager = PeerManager.new(dstSwitch)
dstNode = await newTestWakuRelay(dstSwitch)
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
require:
conn.isSome()
## Given
let networkTopic = "test-network1"
let message = fakeWakuMessage()
# Self subscription (triggerSelf = true)
let srcSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message))
srcNode.subscribe(networkTopic, srcSubsHandler)
# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)
## When
discard await srcNode.publish(networkTopic, message)
## Then
require:
await srcSubsFut.withTimeout(5.seconds)
await dstSubsFut.withTimeout(5.seconds)
let (srcTopic, srcMsg) = srcSubsFut.read()
check:
srcTopic == networkTopic
srcMsg == message
let (dstTopic, dstMsg) = dstSubsFut.read()
check:
dstTopic == networkTopic
dstMsg == message
## Cleanup
await allFutures(srcSwitch.stop(), dstSwitch.stop())
asyncTest "content topic validator as a message subscription filter":
## Setup
let
srcSwitch = newTestSwitch()
srcPeerManager = PeerManager.new(srcSwitch)
srcNode = await newTestWakuRelay(srcSwitch)
dstSwitch = newTestSwitch()
dstPeerManager = PeerManager.new(dstSwitch)
dstNode = await newTestWakuRelay(dstSwitch)
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
require:
conn.isSome()
## Given
let networkTopic = "test-network1"
let contentTopic = "test-content1"
let message = fakeWakuMessage(contentTopic=contentTopic)
let messages = @[
fakeWakuMessage(contentTopic="any"),
fakeWakuMessage(contentTopic="any"),
fakeWakuMessage(contentTopic="any"),
message,
fakeWakuMessage(contentTopic="any"),
]
# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))
dstNode.subscribe(networkTopic, dstSubsHandler)
await sleepAsync(500.millis)
# Validator
proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} =
let msg = WakuMessage.decode(msg.data)
if msg.isErr():
return ValidationResult.Ignore
# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic:
return ValidationResult.Reject
return ValidationResult.Accept
dstNode.addValidator(networkTopic, validator)
## When
for msg in messages:
discard await srcNode.publish(networkTopic, msg)
## Then
require:
await dstSubsFut.withTimeout(5.seconds)
let (dstTopic, dstMsg) = dstSubsFut.read()
check:
dstTopic == networkTopic
dstMsg == message
## Cleanup
await allFutures(srcSwitch.stop(), dstSwitch.stop())

View File

@ -21,9 +21,9 @@ import
../../waku/v2/utils/peers,
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_relay,
../test_helpers,
./testlib/common
#./testlib/testutils
../../test_helpers,
../testlib/common,
../testlib/testutils
template sourceDir: string = currentSourcePath.parentDir()
const KEY_PATH = sourceDir / "resources/test_key.pem"
@ -43,7 +43,7 @@ procSuite "WakuNode - Relay":
await node1.mountRelay()
check:
GossipSub(node1.wakuRelay).heartbeatFut.isNil == false
GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false
# Relay protocol starts if mounted before node start
@ -55,13 +55,13 @@ procSuite "WakuNode - Relay":
check:
# Relay has not yet started as node has not yet started
GossipSub(node2.wakuRelay).heartbeatFut.isNil
GossipSub(node2.wakuRelay).heartbeatFut.isNil()
await node2.start()
check:
# Relay started on node start
GossipSub(node2.wakuRelay).heartbeatFut.isNil == false
GossipSub(node2.wakuRelay).heartbeatFut.isNil() == false
await allFutures([node1.stop(), node2.stop()])
@ -87,8 +87,10 @@ procSuite "WakuNode - Relay":
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await allFutures(
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -105,13 +107,13 @@ procSuite "WakuNode - Relay":
await sleepAsync(500.millis)
await node1.publish(pubSubTopic, message)
await sleepAsync(500.millis)
## Then
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
await node3.stop()
## Cleanup
await allFutures(node1.stop(), node2.stop(), node3.stop())
asyncTest "filtering relayed messages using topic validators":
## test scenario:
@ -157,23 +159,26 @@ procSuite "WakuNode - Relay":
var completionFutValidatorAcc = newFuture[bool]()
var completionFutValidatorRej = newFuture[bool]()
# set a topic validator for pubSubTopic
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
## the validator that only allows messages with contentTopic1 to be relayed
check:
topic == pubSubTopic
let msg = WakuMessage.decode(message.data)
if msg.isOk():
# only relay messages with contentTopic1
if msg.value().contentTopic == contentTopic1:
result = ValidationResult.Accept
completionFutValidatorAcc.complete(true)
else:
result = ValidationResult.Reject
completionFutValidatorRej.complete(true)
# set a topic validator for pubSubTopic
let pb = PubSub(node2.wakuRelay)
pb.addValidator(pubSubTopic, validator)
let msg = WakuMessage.decode(message.data)
if msg.isErr():
completionFutValidatorAcc.complete(false)
return ValidationResult.Reject
# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic1:
completionFutValidatorRej.complete(true)
return ValidationResult.Reject
completionFutValidatorAcc.complete(true)
return ValidationResult.Accept
node2.wakuRelay.addValidator(pubSubTopic, validator)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -208,7 +213,8 @@ procSuite "WakuNode - Relay":
await allFutures(node1.stop(), node2.stop(), node3.stop())
asyncTest "Stats of peer sending wrong WakuMessages are updated":
# TODO: Add a function to validate the WakuMessage integrity
xasyncTest "Stats of peer sending wrong WakuMessages are updated":
# Create 2 nodes
let nodes = toSeq(0..1).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))

View File

@ -9,7 +9,7 @@ import
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
import
../../../protocol/waku_message,
../serdes,
../base64
@ -24,7 +24,7 @@ type RelayWakuMessage* = object
timestamp*: Option[int64]
type
type
RelayGetMessagesResponse* = seq[RelayWakuMessage]
RelayPostMessagesRequest* = RelayWakuMessage
@ -44,7 +44,7 @@ proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
)
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] =
let
let
payload = ?msg.payload.decode()
contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version))
@ -59,10 +59,6 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic)
{.raises: [IOError, Defect].} =
writer.writeValue(string(topic))
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} =
writer.beginRecord()
@ -79,14 +75,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError, Defect].} =
value = Base64String(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic)
{.raises: [SerializationError, IOError, Defect].} =
pubsubTopic = PubSubTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic)
{.raises: [SerializationError, IOError, Defect].} =
contentTopic = ContentTopic(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} =
var
@ -122,5 +110,5 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
payload: payload.get(),
contentTopic: contentTopic,
version: version,
timestamp: timestamp
timestamp: timestamp
)

View File

@ -503,10 +503,6 @@ proc startRelay*(node: WakuNode) {.async.} =
## Setup relay protocol
# Subscribe to the default PubSub topics
for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic)
# Resume previous relay connections
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
@ -532,7 +528,6 @@ proc mountRelay*(node: WakuNode,
let initRes = WakuRelay.new(
node.switch,
defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics),
triggerSelf = triggerSelf
)
if initRes.isErr():
@ -553,6 +548,13 @@ proc mountRelay*(node: WakuNode,
info "relay mounted successfully"
# TODO: As part of #1545, remove this and update the tests cases
node.subscribe(DefaultPubsubTopic)
# Subscribe to topics
for topic in topics:
node.subscribe(topic)
## Waku filter

View File

@ -8,7 +8,7 @@ else:
{.push raises: [].}
import
std/[sequtils, tables],
std/[tables, sequtils, hashes],
stew/results,
chronos,
chronicles,
@ -37,7 +37,6 @@ type
type
WakuRelay* = ref object of GossipSub
defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics
WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler
@ -78,15 +77,12 @@ method initPubSub(w: WakuRelay) {.raises: [InitializationError].} =
w.initProtocolHandler()
proc new*(T: type WakuRelay,
switch: Switch,
defaultPubsubTopics: seq[PubsubTopic] = @[],
triggerSelf: bool = true): WakuRelayResult[T] =
proc new*(T: type WakuRelay, switch: Switch, triggerSelf: bool = true): WakuRelayResult[T] =
proc msgIdProvider(msg: messages.Message): Result[MessageID, ValidationResult] =
let hash = MultiHash.digest("sha2-256", msg.data)
if hash.isErr():
ok(($msg.data.hash).toBytes())
ok(toBytes($hashes.hash(msg.data)))
else:
ok(hash.value.data.buffer)
@ -100,25 +96,28 @@ proc new*(T: type WakuRelay,
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
)
# Rejects messages that are not WakuMessage
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
let msg = WakuMessage.decode(message.data)
if msg.isOk():
return ValidationResult.Accept
return ValidationResult.Reject
# Add validator to all default pubsub topics
for pubSubTopic in defaultPubsubTopics:
wr.addValidator(pubSubTopic, validator)
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
wr.defaultPubsubTopics = defaultPubsubTopics
# TODO: Add a function to validate the WakuMessage integrity
# # Rejects messages that are not WakuMessage
# proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
# let msg = WakuMessage.decode(message.data)
# if msg.isOk():
# return ValidationResult.Accept
# return ValidationResult.Reject
# # Add validator to all default pubsub topics
# for pubSubTopic in defaultPubsubTopics:
# wr.addValidator(pubSubTopic, validator)
ok(wr)
method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} =
procCall GossipSub(w).addValidator(topic, handler)
method start*(w: WakuRelay) {.async.} =
debug "start"
await procCall GossipSub(w).start()

View File

@ -715,8 +715,7 @@ proc addRLNRelayValidator*(wakuRlnRelay: WakuRLNRelay,
handler(wakumessage)
return pubsub.ValidationResult.Reject
# set a validator for the supplied pubsubTopic
let pb = PubSub(wakuRelay)
pb.addValidator(pubsubTopic, validator)
wakuRelay.addValidator(pubsubTopic, validator)
proc mountRlnRelayStatic*(wakuRelay: WakuRelay,
group: seq[IDCommitment],
@ -943,7 +942,7 @@ proc mount(wakuRelay: WakuRelay,
# getMembershipCredentials returns all credentials in keystore as sequence matching the filter
let allMatchingCredentials = readCredentialsRes.get()
# if any is found, we return the first credential, otherwise credentials is none
# if any is found, we return the first credential, otherwise credentials is none
if allMatchingCredentials.len() > 0:
credentials = some(allMatchingCredentials[0])
else:
@ -999,7 +998,7 @@ proc mount(wakuRelay: WakuRelay,
return err("dynamic rln-relay could not be mounted: " & rlnRelayRes.error())
let wakuRlnRelay = rlnRelayRes.get()
if persistCredentials:
credentials = some(MembershipCredentials(identityCredential: wakuRlnRelay.identityCredential,
membershipGroups: @[MembershipGroup(membershipContract: rlnMembershipContract, treeIndex: wakuRlnRelay.membershipIndex)]
))
@ -1026,9 +1025,11 @@ proc new*(T: type WakuRlnRelay,
# relay protocol is the prerequisite of rln-relay
if wakuRelay.isNil():
return err("WakuRelay protocol is not mounted")
# check whether the pubsub topic is supported at the relay level
if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics:
return err("The relay protocol does not support the configured pubsub topic")
# TODO: Review this. The Waku Relay instance is no longer keeping track of the default pubsub topics
# # check whether the pubsub topic is supported at the relay level
# if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics:
# return err("The relay protocol does not support the configured pubsub topic")
debug "rln-relay input validation passed"
waku_rln_relay_mounting_duration_seconds.nanosecondTime: