diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index c318e07ad..980d605f9 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -22,58 +22,81 @@ import ../test_helpers procSuite "WakuBridge": - let rng = keys.newRng() + ############### + # Suite setup # + ############### + + let + rng = keys.newRng() + + # Bridge + nodev1Key = keys.KeyPair.random(rng[]) + nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[] + bridge = WakuBridge.new( + nodev1Key= nodev1Key, + nodev1Address = localAddress(30303), + powRequirement = 0.002, + rng = rng, + nodev2Key = nodev2Key, + nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000)) + + # Waku v1 node + v1Node = setupTestNode(rng, Waku) + + # Waku v2 node + v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + contentTopic = ContentTopic("0001") + topic = toArray(4, contentTopic.toBytes()[0..3]) + payloadV1 = "hello from V1".toBytes() + payloadV2 = "hello from V2".toBytes() + message = WakuMessage(payload: payloadV2, contentTopic: contentTopic) + + ######################## + # Tests setup/teardown # + ######################## + + setup: + # Runs before each test + waitFor bridge.start() + + waitFor v2Node.start() + v2Node.mountRelay(@[DefaultBridgeTopic]) + + discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) + waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo]) + + teardown: + # Runs after each test + bridge.nodeV1.resetMessageQueue() + v1Node.resetMessageQueue() + waitFor allFutures([bridge.stop(), v2Node.stop()]) + + ############### + # Suite tests # + ############### asyncTest "Messages are bridged between Waku v1 and Waku v2": - let - # Bridge - nodev1Key = keys.KeyPair.random(rng[]) - nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[] - bridge = WakuBridge.new( - nodev1Key= nodev1Key, - nodev1Address = localAddress(30303), - powRequirement = 0.002, - rng = rng, - nodev2Key = nodev2Key, - nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000)) - - # Waku v1 node - v1Node = setupTestNode(rng, Waku) - - # Waku v2 node - v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) - - contentTopic = ContentTopic("0001") - topic = toArray(4, contentTopic.toBytes()[0..3]) - payloadV1 = "hello from V1".toBytes() - payloadV2 = "hello from V2".toBytes() - message = WakuMessage(payload: payloadV2, contentTopic: contentTopic) - - await bridge.start() - - await v2Node.start() - v2Node.mountRelay(@[defaultBridgeTopic]) - - discard await v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) - await v2Node.connectToNodes(@[bridge.nodev2.peerInfo]) - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) + if msg.isOk() and msg.value().version == 1: check: # Message fields are as expected msg.value().contentTopic == contentTopic # Topic translation worked string.fromBytes(msg.value().payload).contains("from V1") + completionFut.complete(true) - v2Node.subscribe(defaultBridgeTopic, relayHandler) + v2Node.subscribe(DefaultBridgeTopic, relayHandler) await sleepAsync(2000.millis) # Test bridging from V2 to V1 - await v2Node.publish(defaultBridgeTopic, message) + await v2Node.publish(DefaultBridgeTopic, message) await sleepAsync(2000.millis) @@ -97,5 +120,13 @@ procSuite "WakuBridge": # v2Node received payload published by v1Node await completionFut.withTimeout(5.seconds) - await bridge.stop() - \ No newline at end of file + # Test filtering of WakuMessage duplicates + v1Node.resetMessageQueue() + + await v2Node.publish(DefaultBridgeTopic, message) + + await sleepAsync(2000.millis) + + check: + # v1Node did not receive duplicate of previous message + v1Node.protocolState(Waku).queue.items.len == 0 diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index ebe8f49ca..42971ddc1 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -1,5 +1,5 @@ import - std/tables, + std/[tables, hashes, sequtils], chronos, confutils, chronicles, chronicles/topics_registry, metrics, stew/[byteutils, objects], stew/shims/net as stewNet, json_rpc/rpcserver, @@ -15,6 +15,7 @@ import ./config_bridge declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"] +declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"] logScope: topics = "wakubridge" @@ -24,9 +25,10 @@ logScope: ################## const - defaultBridgeTopic* = "/waku/2/default-bridge/proto" - clientIdV1 = "nim-waku v1 node" - defaultTTL = 5'u32 + DefaultBridgeTopic* = "/waku/2/default-bridge/proto" + ClientIdV1 = "nim-waku v1 node" + DefaultTTL = 5'u32 + DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue ######### # Types # @@ -36,11 +38,24 @@ type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode + seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. ################### # Helper funtions # ################### +proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool = + if sequence.contains(hash): + return true + + if sequence.len >= DeduplQSize: + trace "Deduplication queue full. Removing oldest item." + sequence.delete 0, 0 # Remove first item in queue + + sequence.add(hash) + + return false + func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, @@ -48,17 +63,35 @@ func toWakuMessage(env: Envelope): WakuMessage = version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = + let msg = env.toWakuMessage() + + debug "message converted to V2", msg=msg + + if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): + # This is a duplicate message. Return + trace "Already seen. Dropping.", msg=msg + waku_bridge_dropped.inc(labelValues = ["duplicate"]) + return + waku_bridge_transfers.inc(labelValues = ["v1_to_v2"]) - await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage()) + await bridge.nodev2.publish(DefaultBridgeTopic, msg) proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} = + debug "sending message to V1", msg=msg + + if bridge.seen.containsOrAdd(msg.encode().buffer.hash()): + # This is a duplicate message. Return + trace "Already seen. Dropping.", msg=msg + waku_bridge_dropped.inc(labelValues = ["duplicate"]) + return + waku_bridge_transfers.inc(labelValues = ["v2_to_v1"]) # @TODO: use namespacing to map v2 contentTopics to v1 topics let v1TopicSeq = msg.contentTopic.toBytes()[0..3] - discard bridge.nodev1.postMessage(ttl = defaultTTL, + discard bridge.nodev1.postMessage(ttl = DefaultTTL, topic = toArray(4, v1TopicSeq), payload = msg.payload) @@ -79,7 +112,7 @@ proc new*(T: type WakuBridge, # Setup Waku v1 node var nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address, - networkId = NetworkId(1), chain = nil, clientId = clientIdV1, + networkId = NetworkId(1), chain = nil, clientId = ClientIdV1, addAllCapabilities = false, rng = rng) nodev1.addCapability Waku # Always enable Waku protocol @@ -138,7 +171,7 @@ proc start*(bridge: WakuBridge) {.async.} = trace "Bridging message from V2 to V1", msg=msg[] bridge.toWakuV1(msg[]) - bridge.nodev2.subscribe(defaultBridgeTopic, relayHandler) + bridge.nodev2.subscribe(DefaultBridgeTopic, relayHandler) proc stop*(bridge: WakuBridge) {.async.} = await bridge.nodev2.stop() @@ -182,7 +215,7 @@ when isMainModule: # Load address configuration let - (nodev1ExtIp, _, _) = setupNat(conf.nat, clientIdV1, + (nodev1ExtIp, _, _) = setupNat(conf.nat, ClientIdV1, Port(conf.devp2pTcpPort + conf.portsShift), Port(conf.udpPort + conf.portsShift)) # TODO: EthereumNode should have a better split of binding address and