Fix: light-mode relay for all light protocols (#529)

* Fix: light-mode relay for all light protocols

* Clear up confusing use of overloaded concepts
This commit is contained in:
Hanno Cornelius 2021-05-04 14:11:41 +02:00 committed by GitHub
parent c0d858af1d
commit 36ff537612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 24 deletions

View File

@ -13,6 +13,8 @@
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default. - Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
- Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`. - Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`.
- Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed. - Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed.
- Fix: allow mounting light protocols without `relay`
- Add `keep-alive` option to maintain stable connection to `relay` peers on idle topics
## 2021-01-05 v0.2 ## 2021-01-05 v0.2

View File

@ -210,6 +210,54 @@ procSuite "WakuNode":
await node1.stop() await node1.stop()
await node2.stop() await node2.stop()
asyncTest "Filter protocol works on node without relay capability":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
defaultTopic = "/waku/2/default-waku/proto"
contentTopic = "defaultCT"
payload = @[byte 1]
message = WakuMessage(payload: payload, contentTopic: contentTopic)
filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
await node1.start()
node1.mountRelay()
node1.mountFilter()
await node2.start()
node2.mountRelay(relayMessages=false) # Do not start WakuRelay or subscribe to any topics
node2.mountFilter()
node2.wakuFilter.setPeer(node1.peerInfo)
check:
node1.wakuRelay.isNil == false # Node1 is a full node
node2.wakuRelay.isNil == true # Node 2 is a light node
var completeFut = newFuture[bool]()
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == payload
msg.contentTopic == contentTopic
completeFut.complete(true)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node2.subscribe(filterRequest, filterHandler)
await sleepAsync(2000.millis)
# Let's check that content filtering works on the default topic
await node1.publish(defaultTopic, message)
check:
(await completeFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Store protocol returns expected message": asyncTest "Store protocol returns expected message":
let let

View File

@ -367,20 +367,6 @@ proc mountFilter*(node: WakuNode) =
node.filters.notify(message, requestId) # Trigger filter handlers on a light node node.filters.notify(message, requestId) # Trigger filter handlers on a light node
waku_node_messages.inc(labelValues = ["filter"]) waku_node_messages.inc(labelValues = ["filter"])
if node.wakuRelay.isNil:
debug "light node: mounting relay without starting"
## WakuFilter currently requires WakuRelay to be mounted in order to work.
## This is to allow protocol stream negotation with full nodes to succeed.
## Here we mount relay on the switch only, but do not subscribe to any pubsub
## topics. We also never start the relay protocol.
## @TODO: remove WakuRelay dependency
node.switch.mount(WakuRelay.init(
switch = node.switch,
triggerSelf = true,
sign = false,
verifySignature = false
))
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
node.switch.mount(node.wakuFilter) node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
@ -455,7 +441,11 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
let pb = PubSub(node.wakuRelay) let pb = PubSub(node.wakuRelay)
pb.addValidator(pubsubTopic, validator) pb.addValidator(pubsubTopic, validator)
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false, keepAlive = false) {.gcsafe.} = proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
rlnRelayEnabled = false,
keepAlive = false,
relayMessages = true) {.gcsafe.} =
let wakuRelay = WakuRelay.init( let wakuRelay = WakuRelay.init(
switch = node.switch, switch = node.switch,
# Use default # Use default
@ -464,13 +454,22 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
sign = false, sign = false,
verifySignature = false verifySignature = false
) )
info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, keepAlive=keepAlive, relayMessages=relayMessages
wakuRelay.keepAlive = keepAlive
node.wakuRelay = wakuRelay
node.switch.mount(wakuRelay) node.switch.mount(wakuRelay)
info "mounting relay" if not relayMessages:
## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes).
## All nodes, however, currently require WakuRelay, regardless of desired capabilities.
## This is to allow protocol stream negotation with relay-capable nodes to succeed.
## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub
## topics. We also never start the relay protocol. node.wakuRelay remains nil.
## @TODO: in future, this WakuRelay dependency will be removed completely
return
node.wakuRelay = wakuRelay
wakuRelay.keepAlive = keepAlive
node.subscribe(defaultTopic, none(TopicHandler)) node.subscribe(defaultTopic, none(TopicHandler))
@ -510,7 +509,6 @@ proc mountLightPush*(node: WakuNode) =
node.switch.mount(node.wakuLightPush) node.switch.mount(node.wakuLightPush)
## Helpers ## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} = proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address
@ -693,11 +691,14 @@ when isMainModule:
setStorePeer(node, conf.storenode) setStorePeer(node, conf.storenode)
# Relay setup # Relay setup
if conf.relay: # True by default mountRelay(node,
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay, keepAlive = conf.keepAlive) conf.topics.split(" "),
rlnRelayEnabled = conf.rlnrelay,
keepAlive = conf.keepAlive,
relayMessages = conf.relay) # Indicates if node is capable to relay messages
if conf.staticnodes.len > 0: if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes) waitFor connectToNodes(node, conf.staticnodes)
# NOTE Must be mounted after relay # NOTE Must be mounted after relay
if conf.lightpush: if conf.lightpush: