deploy: e04738606f9e658b5183ce5e7baaddd34932daff

This commit is contained in:
jm-clius 2021-02-02 11:46:38 +00:00
parent d8e4a5258f
commit 78300d2301
13 changed files with 63 additions and 84 deletions

1
.update.timestamp Normal file
View File

@ -0,0 +1 @@
1612265639

View File

@ -27,7 +27,7 @@ proc start*(node: WakuNode) {.async.} =
##
## Status: Implemented.
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
@ -40,12 +40,12 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
##
## Status: Implemented.
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Unsubscribes a handler from a PubSub topic.
##
## Status: Implemented.
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.

View File

@ -23,7 +23,7 @@ proc runBackground() {.async.} =
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
await node.start()
await node.mountRelay(rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(rlnRelayEnabled = conf.rlnrelay)
# Subscribe to a topic
let topic = cast[Topic]("foobar")
@ -31,7 +31,7 @@ proc runBackground() {.async.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
await node.subscribe(topic, handler)
node.subscribe(topic, handler)
# Publish to a topic
let payload = cast[seq[byte]]("hello world")

View File

@ -176,9 +176,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start()
if conf.filternode != "":
await node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
else:
await node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay)
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
@ -251,7 +251,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
trace "Invalid encoded WakuMessage", error = decoded.error
let topic = cast[Topic](DefaultTopic)
await node.subscribe(topic, handler)
node.subscribe(topic, handler)
await chat.readWriteLoop()
runForever()

View File

@ -42,7 +42,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Debug API: get node info":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
# RPC server setup
let
@ -68,7 +68,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Relay API: publish and subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
# RPC server setup
let
@ -128,13 +128,13 @@ procSuite "Waku v2 JSON-RPC API":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -188,7 +188,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Store API: retrieve historical messages":
waitFor node.start()
waitFor node.mountRelay(@[defaultTopic])
node.mountRelay(@[defaultTopic])
# RPC server setup
let
@ -249,7 +249,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Filter API: subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
node.mountFilter()
@ -434,13 +434,13 @@ procSuite "Waku v2 JSON-RPC API":
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -524,13 +524,13 @@ procSuite "Waku v2 JSON-RPC API":
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])

View File

@ -27,7 +27,7 @@ suite "Waku v2 Remote Procedure Calls":
waitFor node.start()
waitFor node.mountRelay(@["waku"])
node.mountRelay(@["waku"])
# RPC server setup
let

View File

@ -85,7 +85,7 @@ procSuite "FloodSub":
)
for node in nodes:
await node.mountRelay()
node.mountRelay()
await subscribeNodes(nodes)

View File

@ -49,10 +49,10 @@ procSuite "WakuNode":
await node.start()
await node.mountRelay()
node.mountRelay()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node.subscribe(pubSubTopic, relayHandler)
node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
@ -101,14 +101,14 @@ procSuite "WakuNode":
await allFutures([node1.start(), node2.start()])
await node1.mountRelay()
await node2.mountRelay()
node1.mountRelay()
node2.mountRelay()
node1.mountFilter()
node2.mountFilter()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.wakuFilter.setPeer(node2.peerInfo)
@ -221,13 +221,13 @@ procSuite "WakuNode":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -243,7 +243,7 @@ procSuite "WakuNode":
val.payload == payload
completionFut.complete(true)
await node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message)

View File

@ -84,7 +84,7 @@ proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
mountFilter(node)
if config.relay:
waitFor mountRelay(node, config.topics.split(" "))
mountRelay(node, config.topics.split(" "))
if config.staticnodesv2.len > 0:
waitFor connectToNodes(node, config.staticnodesv2)

View File

@ -70,45 +70,26 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
var failedTopics: seq[string]
# Subscribe to all requested topics
for topic in topics:
if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)):
# If any topic fails to subscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
if (failedTopics.len() == 0):
# Successfully subscribed to all requested topics
return true
else:
# Failed to subscribe to one or more topics
raise newException(ValueError, "Failed to subscribe to topics " & repr(failedTopics))
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Successfully subscribed to all requested topics
return true
rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
var failedTopics: seq[string]
# Unsubscribe all handlers from requested topics
for topic in topics:
if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)):
# If any topic fails to unsubscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Remove message cache for topic
topicCache.del(topic)
node.unsubscribeAll(topic)
# Remove message cache for topic
topicCache.del(topic)
if (failedTopics.len() == 0):
# Successfully unsubscribed from all requested topics
return true
else:
# Failed to unsubscribe from one or more topics
raise newException(ValueError, "Failed to unsubscribe from topics " & repr(failedTopics))
# Successfully unsubscribed from all requested topics
return true

View File

@ -53,8 +53,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
warn "waku_subscribe decode error", msg=msg
info "waku_subscribe raw data string", str=cast[string](data)
# XXX: Can we make this context async to use await?
discard node.subscribe(topic, handler)
node.subscribe(topic, handler)
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")

View File

@ -157,7 +157,7 @@ proc stop*(node: WakuNode) {.async.} =
await node.switch.stop()
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
@ -166,7 +166,7 @@ proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
info "subscribe", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.subscribe(topic, handler)
wakuRelay.subscribe(topic, handler)
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
@ -187,23 +187,23 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Unsubscribes a handler from a PubSub topic.
##
## Status: Implemented.
info "unsubscribe", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribe(@[(topic, handler)])
wakuRelay.unsubscribe(@[(topic, handler)])
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
info "unsubscribeAll", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribeAll(topic)
wakuRelay.unsubscribeAll(topic)
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
@ -299,7 +299,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) =
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.async, gcsafe.} =
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.gcsafe.} =
# TODO add the RLN registration
let wakuRelay = WakuRelay.init(
switch = node.switch,
@ -328,15 +328,13 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
# XXX: Is using discard here fine? Not sure if we want init to be async?
# Can also move this to the start proc, possibly wiser?
discard node.subscribe(topic, handler)
node.subscribe(topic, handler)
## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
@ -495,7 +493,7 @@ when isMainModule:
mountFilter(node)
if conf.relay:
waitFor mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)

View File

@ -47,10 +47,10 @@ method initPubSub*(w: WakuRelay) =
method subscribe*(w: WakuRelay,
pubSubTopic: string,
handler: TopicHandler) {.async.} =
handler: TopicHandler) =
debug "subscribe", pubSubTopic=pubSubTopic
await procCall GossipSub(w).subscribe(pubSubTopic, handler)
procCall GossipSub(w).subscribe(pubSubTopic, handler)
method publish*(w: WakuRelay,
pubSubTopic: string,
@ -61,16 +61,16 @@ method publish*(w: WakuRelay,
return await procCall GossipSub(w).publish(pubSubTopic, message)
method unsubscribe*(w: WakuRelay,
topics: seq[TopicPair]) {.async.} =
topics: seq[TopicPair]) =
debug "unsubscribe"
await procCall GossipSub(w).unsubscribe(topics)
procCall GossipSub(w).unsubscribe(topics)
method unsubscribeAll*(w: WakuRelay,
pubSubTopic: string) {.async.} =
pubSubTopic: string) =
debug "unsubscribeAll"
await procCall GossipSub(w).unsubscribeAll(pubSubTopic)
procCall GossipSub(w).unsubscribeAll(pubSubTopic)
# GossipSub specific methods --------------------------------------------------
method start*(w: WakuRelay) {.async.} =