deploy: 17cfc32c4fda5054d82e00710d46b21e54d792ed

This commit is contained in:
jm-clius 2021-04-13 06:17:40 +00:00
parent ed60ccc1c8
commit 6d43998886
4 changed files with 106 additions and 18 deletions

View File

@ -9,6 +9,8 @@
- `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets. - `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets.
- Admin API now provides a `post` method to connect to peers on an ad-hoc basis - Admin API now provides a `post` method to connect to peers on an ad-hoc basis
- Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. - Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart.
- Changed `contentTopic` back to a string
- Fixed: content filtering now works on any PubSub topic and not just the `waku` default.
## 2021-01-05 v0.2 ## 2021-01-05 v0.2

View File

@ -132,6 +132,82 @@ procSuite "WakuNode":
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true
await node1.stop() await node1.stop()
await node2.stop() await node2.stop()
asyncTest "Can receive filtered messages published on both default and other topics":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
defaultTopic = "/waku/2/default-waku/proto"
otherTopic = "/non/waku/formatted"
defaultContentTopic = "defaultCT"
otherContentTopic = "otherCT"
defaultPayload = @[byte 1]
otherPayload = @[byte 9]
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
defaultFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[defaultContentTopic])], subscribe: true)
otherFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[otherContentTopic])], subscribe: true)
await node1.start()
node1.mountRelay()
node1.mountFilter()
await node2.start()
node2.mountRelay()
node2.mountFilter()
node2.wakuFilter.setPeer(node1.peerInfo)
var defaultComplete = newFuture[bool]()
var otherComplete = newFuture[bool]()
# Subscribe nodes 1 and 2 to otherTopic
proc emptyHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# Do not notify filters or subscriptions here. This should be default behaviour for all topics
discard
node1.subscribe(otherTopic, emptyHandler)
node2.subscribe(otherTopic, emptyHandler)
await sleepAsync(2000.millis)
proc defaultHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == defaultPayload
msg.contentTopic == defaultContentTopic
defaultComplete.complete(true)
proc otherHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg.payload == otherPayload
msg.contentTopic == otherContentTopic
otherComplete.complete(true)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node2.subscribe(defaultFR, defaultHandler)
await sleepAsync(2000.millis)
# Let's check that content filtering works on the default topic
await node1.publish(defaultTopic, defaultMessage)
check:
(await defaultComplete.withTimeout(5.seconds)) == true
# Now check that content filtering works on other topics
await node2.subscribe(otherFR, otherHandler)
await sleepAsync(2000.millis)
await node1.publish(otherTopic,otherMessage)
check:
(await otherComplete.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Store protocol returns expected message": asyncTest "Store protocol returns expected message":
let let

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az182-10: # Libtool was configured on host fv-az269-370:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -176,16 +176,37 @@ proc stop*(node: WakuNode) {.async.} =
node.started = false node.started = false
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
info "subscribe", topic=topic
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# A default handler should be registered for all topics
trace "Hit default handler", topic=topic, data=data
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "") # Trigger filter handlers on a light node
await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node
waku_node_messages.inc(labelValues = ["relay"])
let wakuRelay = node.wakuRelay
if topic notin PubSub(wakuRelay).topics:
# Add default handler only for new topics
debug "Registering default handler", topic=topic
wakuRelay.subscribe(topic, defaultHandler)
if handler.isSome:
debug "Registering handler", topic=topic
wakuRelay.subscribe(topic, handler.get())
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data. ## this topic. TopicHandler is a method that takes a topic and some data.
## ##
## NOTE The data field SHOULD be decoded as a WakuMessage. ## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented. ## Status: Implemented.
info "subscribe", topic=topic node.subscribe(topic, some(handler))
let wakuRelay = node.wakuRelay
wakuRelay.subscribe(topic, handler)
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
@ -369,7 +390,6 @@ proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(stri
node.wakuRlnRelay = rlnPeer node.wakuRlnRelay = rlnPeer
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
## this procedure is a thin wrapper for the pubsub addValidator method ## this procedure is a thin wrapper for the pubsub addValidator method
## it sets message validator on the given pubsubTopic, the validator will check that ## it sets message validator on the given pubsubTopic, the validator will check that
@ -401,20 +421,11 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
waitFor node.peerManager.reconnectPeers(WakuRelayCodec) waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
info "mounting relay" info "mounting relay"
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
node.wakuRelay.subscribe(defaultTopic, relayHandler) node.subscribe(defaultTopic, none(TopicHandler))
for topic in topics: for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = node.subscribe(topic, none(TopicHandler))
debug "Hit handler", topic=topic, data=data
node.subscribe(topic, handler)
if rlnRelayEnabled: if rlnRelayEnabled:
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc # TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
@ -431,7 +442,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "relay mounted and started successfully" info "relay mounted and started successfully"
## Helpers ## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} = proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address