mirror of https://github.com/waku-org/nwaku.git
Feat/bridge topic mapping (#633)
* Use namespaced content topics for bridge
This commit is contained in:
parent
eb49d3148c
commit
2f53c2c60b
|
@ -15,7 +15,8 @@ import
|
|||
./v2/test_waku_bridge,
|
||||
./v2/test_peer_storage,
|
||||
./v2/test_waku_keepalive,
|
||||
./v2/test_migration_utils
|
||||
./v2/test_migration_utils,
|
||||
./v2/test_namespacing_utils
|
||||
|
||||
when defined(rln):
|
||||
import ./v2/test_waku_rln_relay
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
stew/results,
|
||||
../../waku/v2/utils/namespacing
|
||||
|
||||
procSuite "Namespacing utils":
|
||||
|
||||
asyncTest "Create from string":
|
||||
# Expected case
|
||||
let ns = NamespacedTopic.fromString("/waku/2/default-waku/proto").tryGet()
|
||||
|
||||
check:
|
||||
ns.application == "waku"
|
||||
ns.version == "2"
|
||||
ns.topicName == "default-waku"
|
||||
ns.encoding == "proto"
|
||||
|
||||
# Invalid cases
|
||||
expect ValueError:
|
||||
# Topic should be namespaced
|
||||
discard NamespacedTopic.fromString("this-is-not-namespaced").tryGet()
|
||||
|
||||
expect ValueError:
|
||||
# Topic should start with '/'
|
||||
discard NamespacedTopic.fromString("waku/2/default-waku/proto").tryGet()
|
||||
|
||||
expect ValueError:
|
||||
# Topic has too few parts
|
||||
discard NamespacedTopic.fromString("/waku/2/default-waku").tryGet()
|
||||
|
||||
expect ValueError:
|
||||
# Topic has too many parts
|
||||
discard NamespacedTopic.fromString("/waku/2/default-waku/proto/2").tryGet()
|
||||
|
||||
asyncTest "Stringify namespaced topic":
|
||||
var ns = NamespacedTopic()
|
||||
|
||||
ns.application = "waku"
|
||||
ns.version = "2"
|
||||
ns.topicName = "default-waku"
|
||||
ns.encoding = "proto"
|
||||
|
||||
check:
|
||||
$ns == "/waku/2/default-waku/proto"
|
|
@ -50,8 +50,8 @@ procSuite "WakuBridge":
|
|||
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
contentTopic = ContentTopic("0001")
|
||||
topic = toArray(4, contentTopic.toBytes()[0..3])
|
||||
contentTopic = ContentTopic("/waku/1/1a2b3c4d/rlp")
|
||||
topic = [byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]
|
||||
payloadV1 = "hello from V1".toBytes()
|
||||
payloadV2 = "hello from V2".toBytes()
|
||||
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
|
||||
|
@ -60,8 +60,44 @@ procSuite "WakuBridge":
|
|||
# Tests setup/teardown #
|
||||
########################
|
||||
|
||||
setup:
|
||||
# Runs before each test
|
||||
# setup:
|
||||
# # Runs before each test
|
||||
|
||||
# teardown:
|
||||
# # Runs after each test
|
||||
|
||||
###############
|
||||
# Suite tests #
|
||||
###############
|
||||
|
||||
asyncTest "Topics are correctly converted between Waku v1 and Waku v2":
|
||||
# Expected cases
|
||||
|
||||
check:
|
||||
toV1Topic(ContentTopic("/waku/1/00000000/rlp")) == [byte 0x00, byte 0x00, byte 0x00, byte 0x00]
|
||||
toV2ContentTopic([byte 0x00, byte 0x00, byte 0x00, byte 0x00]) == ContentTopic("/waku/1/00000000/rlp")
|
||||
toV1Topic(ContentTopic("/waku/1/ffffffff/rlp")) == [byte 0xff, byte 0xff, byte 0xff, byte 0xff]
|
||||
toV2ContentTopic([byte 0xff, byte 0xff, byte 0xff, byte 0xff]) == ContentTopic("/waku/1/ffffffff/rlp")
|
||||
toV1Topic(ContentTopic("/waku/1/1a2b3c4d/rlp")) == [byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]
|
||||
toV2ContentTopic([byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]) == ContentTopic("/waku/1/1a2b3c4d/rlp")
|
||||
|
||||
# Invalid cases
|
||||
|
||||
expect ValueError:
|
||||
# Content topic not namespaced
|
||||
discard toV1Topic(ContentTopic("this-is-my-content"))
|
||||
|
||||
expect ValueError:
|
||||
# Content topic name too short
|
||||
discard toV1Topic(ContentTopic("/waku/1/112233/rlp"))
|
||||
|
||||
expect ValueError:
|
||||
# Content topic name not hex
|
||||
discard toV1Topic(ContentTopic("/waku/1/my-content/rlp"))
|
||||
|
||||
asyncTest "Messages are bridged between Waku v1 and Waku v2":
|
||||
# Setup test
|
||||
|
||||
waitFor bridge.start()
|
||||
|
||||
waitFor v2Node.start()
|
||||
|
@ -70,17 +106,6 @@ procSuite "WakuBridge":
|
|||
discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
|
||||
waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo])
|
||||
|
||||
teardown:
|
||||
# Runs after each test
|
||||
bridge.nodeV1.resetMessageQueue()
|
||||
v1Node.resetMessageQueue()
|
||||
waitFor allFutures([bridge.stop(), v2Node.stop()])
|
||||
|
||||
###############
|
||||
# Suite tests #
|
||||
###############
|
||||
|
||||
asyncTest "Messages are bridged between Waku v1 and Waku v2":
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
|
@ -133,3 +158,9 @@ procSuite "WakuBridge":
|
|||
check:
|
||||
# v1Node did not receive duplicate of previous message
|
||||
v1Node.protocolState(Waku).queue.items.len == 0
|
||||
|
||||
# Teardown test
|
||||
|
||||
bridge.nodeV1.resetMessageQueue()
|
||||
v1Node.resetMessageQueue()
|
||||
waitFor allFutures([bridge.stop(), v2Node.stop()])
|
||||
|
|
|
@ -10,6 +10,7 @@ import
|
|||
../v1/protocol/waku_protocol,
|
||||
# Waku v2 imports
|
||||
libp2p/crypto/crypto,
|
||||
../v2/utils/namespacing,
|
||||
../v2/protocol/waku_filter/waku_filter_types,
|
||||
../v2/node/wakunode2,
|
||||
# Common cli config
|
||||
|
@ -45,6 +46,8 @@ type
|
|||
# Helper funtions #
|
||||
###################
|
||||
|
||||
# Deduplication
|
||||
|
||||
proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool =
|
||||
if sequence.contains(hash):
|
||||
return true
|
||||
|
@ -57,10 +60,34 @@ proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool =
|
|||
|
||||
return false
|
||||
|
||||
# Topic conversion
|
||||
|
||||
proc toV2ContentTopic*(v1Topic: waku_protocol.Topic): ContentTopic =
|
||||
## Convert a 4-byte array v1 topic to a namespaced content topic
|
||||
## with format `/waku/1/<v1-topic-bytes-as-hex>/proto`
|
||||
|
||||
var namespacedTopic = NamespacedTopic()
|
||||
|
||||
namespacedTopic.application = "waku"
|
||||
namespacedTopic.version = "1"
|
||||
namespacedTopic.topicName = v1Topic.toHex()
|
||||
namespacedTopic.encoding = "rlp"
|
||||
|
||||
return ContentTopic($namespacedTopic)
|
||||
|
||||
proc toV1Topic*(contentTopic: ContentTopic): waku_protocol.Topic {.raises: [ValueError, Defect]} =
|
||||
## Extracts the 4-byte array v1 topic from a content topic
|
||||
## with format `/waku/1/<v1-topic-bytes-as-hex>/proto`
|
||||
|
||||
hexToByteArray(hexStr = NamespacedTopic.fromString(contentTopic).tryGet().topicName,
|
||||
N = 4) # Byte array length
|
||||
|
||||
# Message conversion
|
||||
|
||||
func toWakuMessage(env: Envelope): WakuMessage =
|
||||
# Translate a Waku v1 envelope to a Waku v2 message
|
||||
WakuMessage(payload: env.data,
|
||||
contentTopic: ContentTopic(string.fromBytes(env.topic)),
|
||||
contentTopic: toV2ContentTopic(env.topic),
|
||||
version: 1)
|
||||
|
||||
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
|
||||
|
@ -78,7 +105,7 @@ proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
|
|||
|
||||
await bridge.nodev2.publish(bridge.nodev2PubsubTopic, msg)
|
||||
|
||||
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
|
||||
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [ValueError, Defect].} =
|
||||
if bridge.seen.containsOrAdd(msg.encode().buffer.hash()):
|
||||
# This is a duplicate message. Return
|
||||
trace "Already seen. Dropping.", msg=msg
|
||||
|
@ -93,7 +120,7 @@ proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
|
|||
let v1TopicSeq = msg.contentTopic.toBytes()[0..3]
|
||||
|
||||
discard bridge.nodev1.postMessage(ttl = DefaultTTL,
|
||||
topic = toArray(4, v1TopicSeq),
|
||||
topic = toV1Topic(msg.contentTopic),
|
||||
payload = msg.payload)
|
||||
|
||||
##############
|
||||
|
@ -177,8 +204,12 @@ proc start*(bridge: WakuBridge) {.async.} =
|
|||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
trace "Bridging message from V2 to V1", msg=msg[]
|
||||
bridge.toWakuV1(msg[])
|
||||
try:
|
||||
trace "Bridging message from V2 to V1", msg=msg.tryGet()
|
||||
bridge.toWakuV1(msg.tryGet())
|
||||
except ValueError:
|
||||
trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg
|
||||
waku_bridge_dropped.inc(labelValues = ["value_error"])
|
||||
|
||||
bridge.nodev2.subscribe(bridge.nodev2PubsubTopic, relayHandler)
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
## Collection of utilities related to namespaced topics
|
||||
## Implemented according to the specified Waku v2 Topic Usage Recommendations
|
||||
## More at https://rfc.vac.dev/spec/23/
|
||||
|
||||
{.push raises: [Defect]}
|
||||
|
||||
import
|
||||
std/strutils,
|
||||
stew/results
|
||||
|
||||
type
|
||||
NamespacedTopic* = object
|
||||
application*: string
|
||||
version*: string
|
||||
topicName*: string
|
||||
encoding*: string
|
||||
|
||||
NamespacingResult*[T] = Result[T, string]
|
||||
|
||||
proc fromString*(T: type NamespacedTopic, topic: string): NamespacingResult[NamespacedTopic] =
|
||||
## Splits a namespaced topic string into its constituent parts.
|
||||
## The topic string has to be in the format `/<application>/<version>/<topic-name>/<encoding>`
|
||||
|
||||
let parts = topic.split('/')
|
||||
|
||||
if parts.len != 5:
|
||||
# Check that we have an expected number of substrings
|
||||
return err("invalid topic format")
|
||||
|
||||
if parts[0] != "":
|
||||
# Ensures that topic starts with a "/"
|
||||
return err("invalid topic format")
|
||||
|
||||
ok(NamespacedTopic(application: parts[1],
|
||||
version: parts[2],
|
||||
topicName: parts[3],
|
||||
encoding: parts[4]))
|
||||
|
||||
proc `$`*(namespacedTopic: NamespacedTopic): string =
|
||||
## Returns a string representation of a namespaced topic
|
||||
## in the format `/<application>/<version>/<topic-name>/<encoding>`
|
||||
|
||||
var topicStr = newString(0)
|
||||
|
||||
topicStr.add("/")
|
||||
topicStr.add(namespacedTopic.application)
|
||||
topicStr.add("/")
|
||||
topicStr.add(namespacedTopic.version)
|
||||
topicStr.add("/")
|
||||
topicStr.add(namespacedTopic.topicName)
|
||||
topicStr.add("/")
|
||||
topicStr.add(namespacedTopic.encoding)
|
||||
|
||||
return topicStr
|
Loading…
Reference in New Issue