Add default handler for all topics (#483)

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Hanno Cornelius 2021-04-13 07:56:49 +02:00 committed by GitHub
parent d78f3c5669
commit 17cfc32c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 17 deletions

View File

@ -9,6 +9,8 @@
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis
- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart.
- Changed `contentTopic` back to a string
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
## 2021-01-05 v0.2

View File

@ -132,6 +132,82 @@ procSuite "WakuNode":
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Can receive filtered messages published on both default and other topics":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
defaultTopic = "/waku/2/default-waku/proto"
otherTopic = "/non/waku/formatted"
defaultContentTopic = "defaultCT"
otherContentTopic = "otherCT"
defaultPayload = @[byte 1]
otherPayload = @[byte 9]
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
defaultFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[defaultContentTopic])], subscribe: true)
otherFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[otherContentTopic])], subscribe: true)
await node1.start()
node1.mountRelay()
node1.mountFilter()
await node2.start()
node2.mountRelay()
node2.mountFilter()
node2.wakuFilter.setPeer(node1.peerInfo)
var defaultComplete = newFuture[bool]()
var otherComplete = newFuture[bool]()
# Subscribe nodes 1 and 2 to otherTopic
proc emptyHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# Do not notify filters or subscriptions here. This should be default behaviour for all topics
discard
node1.subscribe(otherTopic, emptyHandler)
node2.subscribe(otherTopic, emptyHandler)
await sleepAsync(2000.millis)
proc defaultHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == defaultPayload
msg.contentTopic == defaultContentTopic
defaultComplete.complete(true)
proc otherHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == otherPayload
msg.contentTopic == otherContentTopic
otherComplete.complete(true)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node2.subscribe(defaultFR, defaultHandler)
await sleepAsync(2000.millis)
# Let's check that content filtering works on the default topic
await node1.publish(defaultTopic, defaultMessage)
check:
(await defaultComplete.withTimeout(5.seconds)) == true
# Now check that content filtering works on other topics
await node2.subscribe(otherFR, otherHandler)
await sleepAsync(2000.millis)
await node1.publish(otherTopic,otherMessage)
check:
(await otherComplete.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Store protocol returns expected message":
let

View File

@ -176,16 +176,37 @@ proc stop*(node: WakuNode) {.async.} =
node.started = false
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
info "subscribe", topic=topic
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# A default handler should be registered for all topics
trace "Hit default handler", topic=topic, data=data
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "") # Trigger filter handlers on a light node
await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node
waku_node_messages.inc(labelValues = ["relay"])
let wakuRelay = node.wakuRelay
if topic notin PubSub(wakuRelay).topics:
# Add default handler only for new topics
debug "Registering default handler", topic=topic
wakuRelay.subscribe(topic, defaultHandler)
if handler.isSome:
debug "Registering handler", topic=topic
wakuRelay.subscribe(topic, handler.get())
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
info "subscribe", topic=topic
let wakuRelay = node.wakuRelay
wakuRelay.subscribe(topic, handler)
node.subscribe(topic, some(handler))
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
@ -369,7 +390,6 @@ proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(stri
node.wakuRlnRelay = rlnPeer
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets message validator on the given pubsubTopic, the validator will check that
@ -401,20 +421,11 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
info "mounting relay"
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
node.wakuRelay.subscribe(defaultTopic, relayHandler)
node.subscribe(defaultTopic, none(TopicHandler))
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
node.subscribe(topic, handler)
node.subscribe(topic, none(TopicHandler))
if rlnRelayEnabled:
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
@ -431,7 +442,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "relay mounted and started successfully"
## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address