Bump submodules (#363)

* Bump submodules

* GossipSub refactor: Rem async from sub/unsub
This commit is contained in:
Hanno Cornelius 2021-02-02 13:33:59 +02:00 committed by GitHub
parent 886b458ff5
commit 41fe4395a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 80 additions and 102 deletions

View File

@ -27,7 +27,7 @@ proc start*(node: WakuNode) {.async.} =
##
## Status: Implemented.
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
@ -40,12 +40,12 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
##
## Status: Implemented.
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Unsubscribes a handler from a PubSub topic.
##
## Status: Implemented.
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.

View File

@ -23,7 +23,7 @@ proc runBackground() {.async.} =
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
await node.start()
await node.mountRelay(rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(rlnRelayEnabled = conf.rlnrelay)
# Subscribe to a topic
let topic = cast[Topic]("foobar")
@ -31,7 +31,7 @@ proc runBackground() {.async.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
await node.subscribe(topic, handler)
node.subscribe(topic, handler)
# Publish to a topic
let payload = cast[seq[byte]]("hello world")

View File

@ -176,9 +176,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start()
if conf.filternode != "":
await node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
else:
await node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay)
node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay)
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
@ -251,7 +251,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
trace "Invalid encoded WakuMessage", error = decoded.error
let topic = cast[Topic](DefaultTopic)
await node.subscribe(topic, handler)
node.subscribe(topic, handler)
await chat.readWriteLoop()
runForever()

View File

@ -42,7 +42,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Debug API: get node info":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
# RPC server setup
let
@ -68,7 +68,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Relay API: publish and subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
# RPC server setup
let
@ -128,13 +128,13 @@ procSuite "Waku v2 JSON-RPC API":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -188,7 +188,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Store API: retrieve historical messages":
waitFor node.start()
waitFor node.mountRelay(@[defaultTopic])
node.mountRelay(@[defaultTopic])
# RPC server setup
let
@ -249,7 +249,7 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Filter API: subscribe/unsubscribe":
waitFor node.start()
waitFor node.mountRelay()
node.mountRelay()
node.mountFilter()
@ -434,13 +434,13 @@ procSuite "Waku v2 JSON-RPC API":
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -524,13 +524,13 @@ procSuite "Waku v2 JSON-RPC API":
topicCache = newTable[string, seq[WakuMessage]]()
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])

View File

@ -27,7 +27,7 @@ suite "Waku v2 Remote Procedure Calls":
waitFor node.start()
waitFor node.mountRelay(@["waku"])
node.mountRelay(@["waku"])
# RPC server setup
let

View File

@ -85,7 +85,7 @@ procSuite "FloodSub":
)
for node in nodes:
await node.mountRelay()
node.mountRelay()
await subscribeNodes(nodes)

View File

@ -49,10 +49,10 @@ procSuite "WakuNode":
await node.start()
await node.mountRelay()
node.mountRelay()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node.subscribe(pubSubTopic, relayHandler)
node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
@ -101,14 +101,14 @@ procSuite "WakuNode":
await allFutures([node1.start(), node2.start()])
await node1.mountRelay()
await node2.mountRelay()
node1.mountRelay()
node2.mountRelay()
node1.mountFilter()
node2.mountFilter()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.wakuFilter.setPeer(node2.peerInfo)
@ -221,13 +221,13 @@ procSuite "WakuNode":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
node1.mountRelay(@[pubSubTopic])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
node2.mountRelay(@[pubSubTopic])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
node3.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo])
await node3.connectToNodes(@[node2.peerInfo])
@ -243,7 +243,7 @@ procSuite "WakuNode":
val.payload == payload
completionFut.complete(true)
await node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message)

2
vendor/nim-bearssl vendored

@ -1 +1 @@
Subproject commit ba5f4687987817902c2727e30b35cb5ad1e61203
Subproject commit eebf730ccda5b5fade2a8f48b3da1496f2c47ba5

@ -1 +1 @@
Subproject commit b7150d195beff7d018d47a36a2ea19630537cd8b
Subproject commit b42899070a7daa5cf6f0843faf3d6d41659e9591

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit ac9b3e304f630a450efc996f47dc9e6133246a87
Subproject commit c1f6e7276e3810cee4bf7358a36a3444fb4bd75e

@ -1 +1 @@
Subproject commit 10de7aa44330eb9dda1b9503e18ecbaabfbd6da1
Subproject commit cfa95661913b0ff8b1609e3954894f8ab31bbf3e

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 114680453f789f947a3c057bbeb5bdf593f5364b
Subproject commit 4e58eb48ce1b1a06d28802ad22215de9ff92a916

@ -1 +1 @@
Subproject commit 87309f3120d4e627082171a188324d3ee14d8986
Subproject commit 9138f6a8bb9a089790319753895722c2569707b0

@ -1 +1 @@
Subproject commit 33d70b9f378591e074838d6608a4468940137357
Subproject commit 1639ac79035c291688174a2037f7d695aa3ff6f3

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit dc3a2d33fc49e8415323da146b60b5c7e07afd2c
Subproject commit ded863fcb1741483f285e3bbf27541efdc32c78c

@ -1 +1 @@
Subproject commit 32f75d93b0762328d1d85ce62cef84ed919ae31e
Subproject commit 7999d2522565d88499b9d7f99c4175a8eb3f2b41

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 7b1e652224c3d9aeb69923b557daba04f3742a14
Subproject commit 5aebf0990e5315a0349e2414e7bc11da96f703bc

2
vendor/nim-metrics vendored

@ -1 +1 @@
Subproject commit 22a3867341f7b0a9d55661b41d7ee5febe35c86b
Subproject commit 16ec7aeccc2020666d6b7c63288153f3917222fb

@ -1 +1 @@
Subproject commit 24ba989cfcec00d16d7a9bf5e5691c77b3c9295f
Subproject commit 8994b67b07813955c61bebddf4bd2325439c3535

@ -1 +1 @@
Subproject commit a9d5cba699a0ee636ad155ea0dc49747b24d2ea4
Subproject commit ac96054870eb7cbc2cf72dd03fe5d5336b2e5dd9

@ -1 +1 @@
Subproject commit 474bdbf49cf1634ba504888ad1a1927a2703bd3f
Subproject commit 4e2ffe3f6df5b753d7b11fef83cc5ee14f296c1c

@ -1 +1 @@
Subproject commit 068ff3593c1582bf3d96b75dcf40fa72e3739b29
Subproject commit 07039dd887c4e5b57367a16f4be3c18763be1d7b

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit ff524ed832b9933760a5c500252323ec840951a6
Subproject commit 6d3e6a21caf4110d0d432b82f14e41e0271cd76b

2
vendor/nim-stint vendored

@ -1 +1 @@
Subproject commit 9e49b00148884a01d61478ae5d2c69b543b93ceb
Subproject commit bae3fc6ee4f36a33849c9dafe5a52b9b6cdf672a

@ -1 +1 @@
Subproject commit c8a1339b2b58bac086d37d7e3b42c105d5925e69
Subproject commit 9c4c4d3acf8f13e8d973180548837cc738e14aa2

View File

@ -84,7 +84,7 @@ proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
mountFilter(node)
if config.relay:
waitFor mountRelay(node, config.topics.split(" "))
mountRelay(node, config.topics.split(" "))
if config.staticnodesv2.len > 0:
waitFor connectToNodes(node, config.staticnodesv2)

View File

@ -70,45 +70,26 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top
rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Subscribes a node to a list of PubSub topics
debug "post_waku_v2_relay_v1_subscriptions"
var failedTopics: seq[string]
# Subscribe to all requested topics
for topic in topics:
if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)):
# If any topic fails to subscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
if (failedTopics.len() == 0):
# Successfully subscribed to all requested topics
return true
else:
# Failed to subscribe to one or more topics
raise newException(ValueError, "Failed to subscribe to topics " & repr(failedTopics))
node.subscribe(topic, topicHandler)
# Create message cache for this topic
debug "MessageCache for topic", topic=topic
topicCache[topic] = @[]
# Successfully subscribed to all requested topics
return true
rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool:
## Unsubscribes a node from a list of PubSub topics
debug "delete_waku_v2_relay_v1_subscriptions"
var failedTopics: seq[string]
# Unsubscribe all handlers from requested topics
for topic in topics:
if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)):
# If any topic fails to unsubscribe, add to list of failedTopics
failedTopics.add(topic)
else:
# Remove message cache for topic
topicCache.del(topic)
node.unsubscribeAll(topic)
# Remove message cache for topic
topicCache.del(topic)
if (failedTopics.len() == 0):
# Successfully unsubscribed from all requested topics
return true
else:
# Failed to unsubscribe from one or more topics
raise newException(ValueError, "Failed to unsubscribe from topics " & repr(failedTopics))
# Successfully unsubscribed from all requested topics
return true

View File

@ -53,8 +53,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
warn "waku_subscribe decode error", msg=msg
info "waku_subscribe raw data string", str=cast[string](data)
# XXX: Can we make this context async to use await?
discard node.subscribe(topic, handler)
node.subscribe(topic, handler)
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")

View File

@ -157,7 +157,7 @@ proc stop*(node: WakuNode) {.async.} =
await node.switch.stop()
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
@ -166,7 +166,7 @@ proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
info "subscribe", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.subscribe(topic, handler)
wakuRelay.subscribe(topic, handler)
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
@ -187,23 +187,23 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
## Unsubscribes a handler from a PubSub topic.
##
## Status: Implemented.
info "unsubscribe", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribe(@[(topic, handler)])
wakuRelay.unsubscribe(@[(topic, handler)])
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
info "unsubscribeAll", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribeAll(topic)
wakuRelay.unsubscribeAll(topic)
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
@ -299,7 +299,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) =
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.async, gcsafe.} =
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.gcsafe.} =
# TODO add the RLN registration
let wakuRelay = WakuRelay.init(
switch = node.switch,
@ -328,15 +328,13 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
# XXX: Is using discard here fine? Not sure if we want init to be async?
# Can also move this to the start proc, possibly wiser?
discard node.subscribe(topic, handler)
node.subscribe(topic, handler)
## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
@ -495,7 +493,7 @@ when isMainModule:
mountFilter(node)
if conf.relay:
waitFor mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay)
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)

View File

@ -47,10 +47,10 @@ method initPubSub*(w: WakuRelay) =
method subscribe*(w: WakuRelay,
pubSubTopic: string,
handler: TopicHandler) {.async.} =
handler: TopicHandler) =
debug "subscribe", pubSubTopic=pubSubTopic
await procCall GossipSub(w).subscribe(pubSubTopic, handler)
procCall GossipSub(w).subscribe(pubSubTopic, handler)
method publish*(w: WakuRelay,
pubSubTopic: string,
@ -61,16 +61,16 @@ method publish*(w: WakuRelay,
return await procCall GossipSub(w).publish(pubSubTopic, message)
method unsubscribe*(w: WakuRelay,
topics: seq[TopicPair]) {.async.} =
topics: seq[TopicPair]) =
debug "unsubscribe"
await procCall GossipSub(w).unsubscribe(topics)
procCall GossipSub(w).unsubscribe(topics)
method unsubscribeAll*(w: WakuRelay,
pubSubTopic: string) {.async.} =
pubSubTopic: string) =
debug "unsubscribeAll"
await procCall GossipSub(w).unsubscribeAll(pubSubTopic)
procCall GossipSub(w).unsubscribeAll(pubSubTopic)
# GossipSub specific methods --------------------------------------------------
method start*(w: WakuRelay) {.async.} =