From 00f4ce4cbcaa7ebcda1cc0aacde6318c29dce63b Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 8 Feb 2023 16:09:59 +0100 Subject: [PATCH] refactor(relay): improve wakuy relay api --- tests/v2/test_wakunode_relay.nim | 17 +++--- waku/v2/node/waku_node.nim | 89 +++++++++++++++++--------------- waku/v2/protocol/waku_relay.nim | 20 +++++-- 3 files changed, 71 insertions(+), 55 deletions(-) diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/test_wakunode_relay.nim index c105508c2..80f855ed8 100644 --- a/tests/v2/test_wakunode_relay.nim +++ b/tests/v2/test_wakunode_relay.nim @@ -221,8 +221,7 @@ procSuite "WakuNode - Relay": require conn.isSome # Node 1 subscribes to topic - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard - nodes[1].subscribe(DefaultPubsubTopic, handler) + nodes[1].subscribe(DefaultPubsubTopic) await sleepAsync(500.millis) # Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes) @@ -244,10 +243,10 @@ procSuite "WakuNode - Relay": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60510), wsBindPort = Port(8000), wsEnabled = true) + bindPort = Port(60510), wsBindPort = Port(8001), wsEnabled = true) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60512), wsBindPort = Port(8100), wsEnabled = true) + bindPort = Port(60512), wsBindPort = Port(8101), wsEnabled = true) pubSubTopic = "test" contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() @@ -288,7 +287,7 @@ procSuite "WakuNode - Relay": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60520), wsBindPort = Port(8000), wsEnabled = true) + bindPort = Port(60520), wsBindPort = Port(8002), wsEnabled = true) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60522)) @@ -335,7 +334,7 @@ procSuite "WakuNode - Relay": bindPort = Port(60530)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60532), wsBindPort = Port(8100), wsEnabled = true) + bindPort = Port(60532), wsBindPort = Port(8103), wsEnabled = true) pubSubTopic = "test" contentTopic = ContentTopic("/waku/2/default-content/proto") payload = "hello world".toBytes() @@ -379,7 +378,7 @@ procSuite "WakuNode - Relay": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60540), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) + bindPort = Port(60540), wsBindPort = Port(8004), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60542)) @@ -421,9 +420,9 @@ procSuite "WakuNode - Relay": asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8005), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8100), wsEnabled = true ) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8105), wsEnabled = true ) let pubSubTopic = "test" diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index d18357064..3d19bff54 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -395,63 +395,71 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s ## Waku relay -proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]) = - if node.wakuRelay.isNil(): - error "Invalid API call to `subscribe`. WakuRelay not mounted." - # TODO: improved error handling +proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = + if node.wakuRelay.isSubscribed(topic): return - info "subscribe", topic=topic - - proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # A default handler should be registered for all topics - - let msg = WakuMessage.decode(data) - if msg.isErr(): - # TODO: Add metric to track waku message decode errors - return - + proc traceHandler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = trace "waku.relay received", pubsubTopic=topic, hash=MultiHash.digest("sha2-256", data).expect("valid hash").data.buffer.to0xHex(), # TODO: this could be replaced by a message UID receivedTime=getNowInNanosecondTime() - # Notify mounted protocols of new message - if not node.wakuFilter.isNil(): - await node.wakuFilter.handleMessage(topic, msg.value) - - if not node.wakuArchive.isNil(): - node.wakuArchive.handleMessage(topic, msg.value) - waku_node_messages.inc(labelValues = ["relay"]) + proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuFilter.isNil(): + return - let wakuRelay = node.wakuRelay + await node.wakuFilter.handleMessage(topic, msg) - if topic notin PubSub(wakuRelay).topics: - # Add default handler only for new topics - debug "Registering default handler", topic=topic - wakuRelay.subscribe(topic, defaultHandler) + proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + if node.wakuArchive.isNil(): + return - if handler.isSome(): - debug "Registering handler", topic=topic - wakuRelay.subscribe(topic, handler.get()) + node.wakuArchive.handleMessage(topic, msg) -proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = + + let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.decode(data) + if msg.isErr(): + return + + await traceHandler(topic, data) + await filterHandler(topic, msg.value) + await archiveHandler(topic, msg.value) + + node.wakuRelay.subscribe(topic, defaultHandler) + + +proc subscribe*(node: WakuNode, topic: PubsubTopic) = + if node.wakuRelay.isNil(): + error "Invalid API call to `subscribe`. WakuRelay not mounted." + return + + debug "subscribe", pubsubTopic= topic + + node.registerRelayDefaultHandler(topic) + +proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. - ## - ## NOTE The data field SHOULD be decoded as a WakuMessage. - node.subscribe(topic, some(handler)) + if node.wakuRelay.isNil(): + error "Invalid API call to `subscribe`. WakuRelay not mounted." + return -proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = + debug "subscribe", pubsubTopic= topic + + node.registerRelayDefaultHandler(topic) + node.wakuRelay.subscribe(topic, handler) + +proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) = ## Unsubscribes a handler from a PubSub topic. if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." - # TODO: improved error handling return - info "unsubscribe", topic=topic + debug "unsubscribe", oubsubTopic= topic let wakuRelay = node.wakuRelay wakuRelay.unsubscribe(@[(topic, handler)]) @@ -461,13 +469,12 @@ proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." - # TODO: improved error handling return info "unsubscribeAll", topic=topic - let wakuRelay = node.wakuRelay - wakuRelay.unsubscribeAll(topic) + node.wakuRelay.unsubscribeAll(topic) + proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a @@ -491,14 +498,14 @@ proc startRelay*(node: WakuNode) {.async.} = info "starting relay protocol" if node.wakuRelay.isNil(): - trace "Failed to start relay. Not mounted." + error "Failed to start relay. Not mounted." return ## Setup relay protocol # Subscribe to the default PubSub topics for topic in node.wakuRelay.defaultPubsubTopics: - node.subscribe(topic, none(TopicHandler)) + node.subscribe(topic) # Resume previous relay connections if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 142849aca..15cd2670c 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -8,6 +8,7 @@ else: {.push raises: [].} import + std/[sequtils, tables], stew/results, chronos, chronicles, @@ -32,12 +33,14 @@ type WakuRelayResult*[T] = Result[T, string] type PubsubRawHandler* = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} - SubsciptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} + SubscriptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} type WakuRelay* = ref object of GossipSub defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics + WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler + proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -125,12 +128,19 @@ method stop*(w: WakuRelay) {.async.} = await procCall GossipSub(w).stop() -method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: SubsciptionHandler|PubsubRawHandler) = +proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = + GossipSub(w).topics.hasKey(topic) + +iterator subscribedTopics*(w: WakuRelay): lent PubsubTopic = + for topic in GossipSub(w).topics.keys(): + yield topic + +method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = debug "subscribe", pubsubTopic=pubsubTopic var subsHandler: PubsubRawHandler - when handler is SubsciptionHandler: - subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} = + when handler is SubscriptionHandler: + subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe.} = let decodeRes = WakuMessage.decode(data) if decodeRes.isErr(): debug "message decode failure", pubsubTopic=pubsubTopic, error=decodeRes.error @@ -143,7 +153,7 @@ method subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: SubsciptionHa procCall GossipSub(w).subscribe(pubsubTopic, subsHandler) method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) = - debug "unsubscribe", topics=topics + debug "unsubscribe", pubsubTopic=topics.mapIt(it[0]) procCall GossipSub(w).unsubscribe(topics)