diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 2469a6380..939e6c145 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -15,7 +15,8 @@ import ./v2/test_waku_bridge, ./v2/test_peer_storage, ./v2/test_waku_keepalive, - ./v2/test_migration_utils + ./v2/test_migration_utils, + ./v2/test_namespacing_utils when defined(rln): import ./v2/test_waku_rln_relay diff --git a/tests/v2/test_namespacing_utils.nim b/tests/v2/test_namespacing_utils.nim new file mode 100644 index 000000000..79d757d3f --- /dev/null +++ b/tests/v2/test_namespacing_utils.nim @@ -0,0 +1,47 @@ +{.used.} + +import + testutils/unittests, + chronos, + stew/results, + ../../waku/v2/utils/namespacing + +procSuite "Namespacing utils": + + asyncTest "Create from string": + # Expected case + let ns = NamespacedTopic.fromString("/waku/2/default-waku/proto").tryGet() + + check: + ns.application == "waku" + ns.version == "2" + ns.topicName == "default-waku" + ns.encoding == "proto" + + # Invalid cases + expect ValueError: + # Topic should be namespaced + discard NamespacedTopic.fromString("this-is-not-namespaced").tryGet() + + expect ValueError: + # Topic should start with '/' + discard NamespacedTopic.fromString("waku/2/default-waku/proto").tryGet() + + expect ValueError: + # Topic has too few parts + discard NamespacedTopic.fromString("/waku/2/default-waku").tryGet() + + expect ValueError: + # Topic has too many parts + discard NamespacedTopic.fromString("/waku/2/default-waku/proto/2").tryGet() + + asyncTest "Stringify namespaced topic": + var ns = NamespacedTopic() + + ns.application = "waku" + ns.version = "2" + ns.topicName = "default-waku" + ns.encoding = "proto" + + check: + $ns == "/waku/2/default-waku/proto" diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index fdf0ac3fe..6f7e28e3f 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -50,8 +50,8 @@ procSuite "WakuBridge": v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) - contentTopic = ContentTopic("0001") - topic = toArray(4, contentTopic.toBytes()[0..3]) + contentTopic = ContentTopic("/waku/1/1a2b3c4d/rlp") + topic = [byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d] payloadV1 = "hello from V1".toBytes() payloadV2 = "hello from V2".toBytes() message = WakuMessage(payload: payloadV2, contentTopic: contentTopic) @@ -60,8 +60,44 @@ procSuite "WakuBridge": # Tests setup/teardown # ######################## - setup: - # Runs before each test + # setup: + # # Runs before each test + + # teardown: + # # Runs after each test + + ############### + # Suite tests # + ############### + + asyncTest "Topics are correctly converted between Waku v1 and Waku v2": + # Expected cases + + check: + toV1Topic(ContentTopic("/waku/1/00000000/rlp")) == [byte 0x00, byte 0x00, byte 0x00, byte 0x00] + toV2ContentTopic([byte 0x00, byte 0x00, byte 0x00, byte 0x00]) == ContentTopic("/waku/1/00000000/rlp") + toV1Topic(ContentTopic("/waku/1/ffffffff/rlp")) == [byte 0xff, byte 0xff, byte 0xff, byte 0xff] + toV2ContentTopic([byte 0xff, byte 0xff, byte 0xff, byte 0xff]) == ContentTopic("/waku/1/ffffffff/rlp") + toV1Topic(ContentTopic("/waku/1/1a2b3c4d/rlp")) == [byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d] + toV2ContentTopic([byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]) == ContentTopic("/waku/1/1a2b3c4d/rlp") + + # Invalid cases + + expect ValueError: + # Content topic not namespaced + discard toV1Topic(ContentTopic("this-is-my-content")) + + expect ValueError: + # Content topic name too short + discard toV1Topic(ContentTopic("/waku/1/112233/rlp")) + + expect ValueError: + # Content topic name not hex + discard toV1Topic(ContentTopic("/waku/1/my-content/rlp")) + + asyncTest "Messages are bridged between Waku v1 and Waku v2": + # Setup test + waitFor bridge.start() waitFor v2Node.start() @@ -69,18 +105,7 @@ procSuite "WakuBridge": discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo]) - - teardown: - # Runs after each test - bridge.nodeV1.resetMessageQueue() - v1Node.resetMessageQueue() - waitFor allFutures([bridge.stop(), v2Node.stop()]) - ############### - # Suite tests # - ############### - - asyncTest "Messages are bridged between Waku v1 and Waku v2": var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -133,3 +158,9 @@ procSuite "WakuBridge": check: # v1Node did not receive duplicate of previous message v1Node.protocolState(Waku).queue.items.len == 0 + + # Teardown test + + bridge.nodeV1.resetMessageQueue() + v1Node.resetMessageQueue() + waitFor allFutures([bridge.stop(), v2Node.stop()]) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index 9c6fde41b..4f4d91971 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -10,6 +10,7 @@ import ../v1/protocol/waku_protocol, # Waku v2 imports libp2p/crypto/crypto, + ../v2/utils/namespacing, ../v2/protocol/waku_filter/waku_filter_types, ../v2/node/wakunode2, # Common cli config @@ -45,6 +46,8 @@ type # Helper funtions # ################### +# Deduplication + proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool = if sequence.contains(hash): return true @@ -57,10 +60,34 @@ proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool = return false +# Topic conversion + +proc toV2ContentTopic*(v1Topic: waku_protocol.Topic): ContentTopic = + ## Convert a 4-byte array v1 topic to a namespaced content topic + ## with format `/waku/1//proto` + + var namespacedTopic = NamespacedTopic() + + namespacedTopic.application = "waku" + namespacedTopic.version = "1" + namespacedTopic.topicName = v1Topic.toHex() + namespacedTopic.encoding = "rlp" + + return ContentTopic($namespacedTopic) + +proc toV1Topic*(contentTopic: ContentTopic): waku_protocol.Topic {.raises: [ValueError, Defect]} = + ## Extracts the 4-byte array v1 topic from a content topic + ## with format `/waku/1//proto` + + hexToByteArray(hexStr = NamespacedTopic.fromString(contentTopic).tryGet().topicName, + N = 4) # Byte array length + +# Message conversion + func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, - contentTopic: ContentTopic(string.fromBytes(env.topic)), + contentTopic: toV2ContentTopic(env.topic), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = @@ -78,7 +105,7 @@ proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = await bridge.nodev2.publish(bridge.nodev2PubsubTopic, msg) -proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = +proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [ValueError, Defect].} = if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): # This is a duplicate message. Return trace "Already seen. Dropping.", msg=msg @@ -93,7 +120,7 @@ proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = let v1TopicSeq = msg.contentTopic.toBytes()[0..3] discard bridge.nodev1.postMessage(ttl = DefaultTTL, - topic = toArray(4, v1TopicSeq), + topic = toV1Topic(msg.contentTopic), payload = msg.payload) ############## @@ -177,8 +204,12 @@ proc start*(bridge: WakuBridge) {.async.} = proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) if msg.isOk(): - trace "Bridging message from V2 to V1", msg=msg[] - bridge.toWakuV1(msg[]) + try: + trace "Bridging message from V2 to V1", msg=msg.tryGet() + bridge.toWakuV1(msg.tryGet()) + except ValueError: + trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg + waku_bridge_dropped.inc(labelValues = ["value_error"]) bridge.nodev2.subscribe(bridge.nodev2PubsubTopic, relayHandler) diff --git a/waku/v2/utils/namespacing.nim b/waku/v2/utils/namespacing.nim new file mode 100644 index 000000000..fc183ea12 --- /dev/null +++ b/waku/v2/utils/namespacing.nim @@ -0,0 +1,54 @@ +## Collection of utilities related to namespaced topics +## Implemented according to the specified Waku v2 Topic Usage Recommendations +## More at https://rfc.vac.dev/spec/23/ + +{.push raises: [Defect]} + +import + std/strutils, + stew/results + +type + NamespacedTopic* = object + application*: string + version*: string + topicName*: string + encoding*: string + + NamespacingResult*[T] = Result[T, string] + +proc fromString*(T: type NamespacedTopic, topic: string): NamespacingResult[NamespacedTopic] = + ## Splits a namespaced topic string into its constituent parts. + ## The topic string has to be in the format `////` + + let parts = topic.split('/') + + if parts.len != 5: + # Check that we have an expected number of substrings + return err("invalid topic format") + + if parts[0] != "": + # Ensures that topic starts with a "/" + return err("invalid topic format") + + ok(NamespacedTopic(application: parts[1], + version: parts[2], + topicName: parts[3], + encoding: parts[4])) + +proc `$`*(namespacedTopic: NamespacedTopic): string = + ## Returns a string representation of a namespaced topic + ## in the format `////` + + var topicStr = newString(0) + + topicStr.add("/") + topicStr.add(namespacedTopic.application) + topicStr.add("/") + topicStr.add(namespacedTopic.version) + topicStr.add("/") + topicStr.add(namespacedTopic.topicName) + topicStr.add("/") + topicStr.add(namespacedTopic.encoding) + + return topicStr