mirror of https://github.com/waku-org/nwaku.git
refactor(relay): improve wakuy relay api
This commit is contained in:
parent
55bac8dedf
commit
00f4ce4cbc
|
@ -221,8 +221,7 @@ procSuite "WakuNode - Relay":
|
|||
require conn.isSome
|
||||
|
||||
# Node 1 subscribes to topic
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
|
||||
nodes[1].subscribe(DefaultPubsubTopic, handler)
|
||||
nodes[1].subscribe(DefaultPubsubTopic)
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
|
||||
|
@ -244,10 +243,10 @@ procSuite "WakuNode - Relay":
|
|||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60510), wsBindPort = Port(8000), wsEnabled = true)
|
||||
bindPort = Port(60510), wsBindPort = Port(8001), wsEnabled = true)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60512), wsBindPort = Port(8100), wsEnabled = true)
|
||||
bindPort = Port(60512), wsBindPort = Port(8101), wsEnabled = true)
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
payload = "hello world".toBytes()
|
||||
|
@ -288,7 +287,7 @@ procSuite "WakuNode - Relay":
|
|||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60520), wsBindPort = Port(8000), wsEnabled = true)
|
||||
bindPort = Port(60520), wsBindPort = Port(8002), wsEnabled = true)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60522))
|
||||
|
@ -335,7 +334,7 @@ procSuite "WakuNode - Relay":
|
|||
bindPort = Port(60530))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60532), wsBindPort = Port(8100), wsEnabled = true)
|
||||
bindPort = Port(60532), wsBindPort = Port(8103), wsEnabled = true)
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
payload = "hello world".toBytes()
|
||||
|
@ -379,7 +378,7 @@ procSuite "WakuNode - Relay":
|
|||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60540), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
||||
bindPort = Port(60540), wsBindPort = Port(8004), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(60542))
|
||||
|
@ -421,9 +420,9 @@ procSuite "WakuNode - Relay":
|
|||
asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8005), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8100), wsEnabled = true )
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8105), wsEnabled = true )
|
||||
|
||||
let
|
||||
pubSubTopic = "test"
|
||||
|
|
|
@ -395,63 +395,71 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
|
|||
|
||||
## Waku relay
|
||||
|
||||
proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]) =
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
# TODO: improved error handling
|
||||
proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||
if node.wakuRelay.isSubscribed(topic):
|
||||
return
|
||||
|
||||
info "subscribe", topic=topic
|
||||
|
||||
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
# A default handler should be registered for all topics
|
||||
|
||||
let msg = WakuMessage.decode(data)
|
||||
if msg.isErr():
|
||||
# TODO: Add metric to track waku message decode errors
|
||||
return
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
||||
trace "waku.relay received",
|
||||
pubsubTopic=topic,
|
||||
hash=MultiHash.digest("sha2-256", data).expect("valid hash").data.buffer.to0xHex(), # TODO: this could be replaced by a message UID
|
||||
receivedTime=getNowInNanosecondTime()
|
||||
|
||||
# Notify mounted protocols of new message
|
||||
if not node.wakuFilter.isNil():
|
||||
await node.wakuFilter.handleMessage(topic, msg.value)
|
||||
|
||||
if not node.wakuArchive.isNil():
|
||||
node.wakuArchive.handleMessage(topic, msg.value)
|
||||
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
|
||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuFilter.isNil():
|
||||
return
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
await node.wakuFilter.handleMessage(topic, msg)
|
||||
|
||||
if topic notin PubSub(wakuRelay).topics:
|
||||
# Add default handler only for new topics
|
||||
debug "Registering default handler", topic=topic
|
||||
wakuRelay.subscribe(topic, defaultHandler)
|
||||
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuArchive.isNil():
|
||||
return
|
||||
|
||||
if handler.isSome():
|
||||
debug "Registering handler", topic=topic
|
||||
wakuRelay.subscribe(topic, handler.get())
|
||||
node.wakuArchive.handleMessage(topic, msg)
|
||||
|
||||
proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
|
||||
|
||||
let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.decode(data)
|
||||
if msg.isErr():
|
||||
return
|
||||
|
||||
await traceHandler(topic, data)
|
||||
await filterHandler(topic, msg.value)
|
||||
await archiveHandler(topic, msg.value)
|
||||
|
||||
node.wakuRelay.subscribe(topic, defaultHandler)
|
||||
|
||||
|
||||
proc subscribe*(node: WakuNode, topic: PubsubTopic) =
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
return
|
||||
|
||||
debug "subscribe", pubsubTopic= topic
|
||||
|
||||
node.registerRelayDefaultHandler(topic)
|
||||
|
||||
proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
|
||||
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
||||
## this topic. TopicHandler is a method that takes a topic and some data.
|
||||
##
|
||||
## NOTE The data field SHOULD be decoded as a WakuMessage.
|
||||
node.subscribe(topic, some(handler))
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
return
|
||||
|
||||
proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
|
||||
debug "subscribe", pubsubTopic= topic
|
||||
|
||||
node.registerRelayDefaultHandler(topic)
|
||||
node.wakuRelay.subscribe(topic, handler)
|
||||
|
||||
proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
|
||||
## Unsubscribes a handler from a PubSub topic.
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||
# TODO: improved error handling
|
||||
return
|
||||
|
||||
info "unsubscribe", topic=topic
|
||||
debug "unsubscribe", oubsubTopic= topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
wakuRelay.unsubscribe(@[(topic, handler)])
|
||||
|
@ -461,13 +469,12 @@ proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
|
|||
|
||||
if node.wakuRelay.isNil():
|
||||
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
||||
# TODO: improved error handling
|
||||
return
|
||||
|
||||
info "unsubscribeAll", topic=topic
|
||||
|
||||
let wakuRelay = node.wakuRelay
|
||||
wakuRelay.unsubscribeAll(topic)
|
||||
node.wakuRelay.unsubscribeAll(topic)
|
||||
|
||||
|
||||
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||
|
@ -491,14 +498,14 @@ proc startRelay*(node: WakuNode) {.async.} =
|
|||
info "starting relay protocol"
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
trace "Failed to start relay. Not mounted."
|
||||
error "Failed to start relay. Not mounted."
|
||||
return
|
||||
|
||||
## Setup relay protocol
|
||||
|
||||
# Subscribe to the default PubSub topics
|
||||
for topic in node.wakuRelay.defaultPubsubTopics:
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
node.subscribe(topic)
|
||||
|
||||
# Resume previous relay connections
|
||||
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
|
|
|
@ -8,6 +8,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[sequtils, tables],
|
||||
stew/results,
|
||||
chronos,
|
||||
chronicles,
|
||||
|
@ -32,12 +33,14 @@ type WakuRelayResult*[T] = Result[T, string]
|
|||
|
||||
type
|
||||
PubsubRawHandler* = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
||||
SubsciptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
|
||||
SubscriptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
|
||||
|
||||
type
|
||||
WakuRelay* = ref object of GossipSub
|
||||
defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics
|
||||
|
||||
WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler
|
||||
|
||||
|
||||
proc initProtocolHandler(w: WakuRelay) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
|
@ -125,12 +128,19 @@ method stop*(w: WakuRelay) {.async.} =
|
|||
await procCall GossipSub(w).stop()
|
||||
|
||||
|
||||
method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: SubsciptionHandler|PubsubRawHandler) =
|
||||
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
||||
GossipSub(w).topics.hasKey(topic)
|
||||
|
||||
iterator subscribedTopics*(w: WakuRelay): lent PubsubTopic =
|
||||
for topic in GossipSub(w).topics.keys():
|
||||
yield topic
|
||||
|
||||
method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
|
||||
debug "subscribe", pubsubTopic=pubsubTopic
|
||||
|
||||
var subsHandler: PubsubRawHandler
|
||||
when handler is SubsciptionHandler:
|
||||
subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} =
|
||||
when handler is SubscriptionHandler:
|
||||
subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe.} =
|
||||
let decodeRes = WakuMessage.decode(data)
|
||||
if decodeRes.isErr():
|
||||
debug "message decode failure", pubsubTopic=pubsubTopic, error=decodeRes.error
|
||||
|
@ -143,7 +153,7 @@ method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: SubsciptionHa
|
|||
procCall GossipSub(w).subscribe(pubsubTopic, subsHandler)
|
||||
|
||||
method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) =
|
||||
debug "unsubscribe", topics=topics
|
||||
debug "unsubscribe", pubsubTopic=topics.mapIt(it[0])
|
||||
|
||||
procCall GossipSub(w).unsubscribe(topics)
|
||||
|
||||
|
|
Loading…
Reference in New Issue