Duplicate filtering for bridge (#556)

This commit is contained in:
Hanno Cornelius 2021-05-20 18:03:56 +02:00 committed by GitHub
parent 9b38e5c893
commit 2691b3e506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 48 deletions

View File

@ -22,58 +22,81 @@ import
../test_helpers
procSuite "WakuBridge":
let rng = keys.newRng()
###############
# Suite setup #
###############
let
rng = keys.newRng()
# Bridge
nodev1Key = keys.KeyPair.random(rng[])
nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[]
bridge = WakuBridge.new(
nodev1Key= nodev1Key,
nodev1Address = localAddress(30303),
powRequirement = 0.002,
rng = rng,
nodev2Key = nodev2Key,
nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000))
# Waku v1 node
v1Node = setupTestNode(rng, Waku)
# Waku v2 node
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
contentTopic = ContentTopic("0001")
topic = toArray(4, contentTopic.toBytes()[0..3])
payloadV1 = "hello from V1".toBytes()
payloadV2 = "hello from V2".toBytes()
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
########################
# Tests setup/teardown #
########################
setup:
# Runs before each test
waitFor bridge.start()
waitFor v2Node.start()
v2Node.mountRelay(@[DefaultBridgeTopic])
discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo])
teardown:
# Runs after each test
bridge.nodeV1.resetMessageQueue()
v1Node.resetMessageQueue()
waitFor allFutures([bridge.stop(), v2Node.stop()])
###############
# Suite tests #
###############
asyncTest "Messages are bridged between Waku v1 and Waku v2":
let
# Bridge
nodev1Key = keys.KeyPair.random(rng[])
nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[]
bridge = WakuBridge.new(
nodev1Key= nodev1Key,
nodev1Address = localAddress(30303),
powRequirement = 0.002,
rng = rng,
nodev2Key = nodev2Key,
nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000))
# Waku v1 node
v1Node = setupTestNode(rng, Waku)
# Waku v2 node
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
contentTopic = ContentTopic("0001")
topic = toArray(4, contentTopic.toBytes()[0..3])
payloadV1 = "hello from V1".toBytes()
payloadV2 = "hello from V2".toBytes()
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
await bridge.start()
await v2Node.start()
v2Node.mountRelay(@[defaultBridgeTopic])
discard await v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
await v2Node.connectToNodes(@[bridge.nodev2.peerInfo])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk() and msg.value().version == 1:
check:
# Message fields are as expected
msg.value().contentTopic == contentTopic # Topic translation worked
string.fromBytes(msg.value().payload).contains("from V1")
completionFut.complete(true)
v2Node.subscribe(defaultBridgeTopic, relayHandler)
v2Node.subscribe(DefaultBridgeTopic, relayHandler)
await sleepAsync(2000.millis)
# Test bridging from V2 to V1
await v2Node.publish(defaultBridgeTopic, message)
await v2Node.publish(DefaultBridgeTopic, message)
await sleepAsync(2000.millis)
@ -97,5 +120,13 @@ procSuite "WakuBridge":
# v2Node received payload published by v1Node
await completionFut.withTimeout(5.seconds)
await bridge.stop()
# Test filtering of WakuMessage duplicates
v1Node.resetMessageQueue()
await v2Node.publish(DefaultBridgeTopic, message)
await sleepAsync(2000.millis)
check:
# v1Node did not receive duplicate of previous message
v1Node.protocolState(Waku).queue.items.len == 0

View File

@ -1,5 +1,5 @@
import
std/tables,
std/[tables, hashes, sequtils],
chronos, confutils, chronicles, chronicles/topics_registry, metrics,
stew/[byteutils, objects],
stew/shims/net as stewNet, json_rpc/rpcserver,
@ -15,6 +15,7 @@ import
./config_bridge
declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"]
declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"]
logScope:
topics = "wakubridge"
@ -24,9 +25,10 @@ logScope:
##################
const
defaultBridgeTopic* = "/waku/2/default-bridge/proto"
clientIdV1 = "nim-waku v1 node"
defaultTTL = 5'u32
DefaultBridgeTopic* = "/waku/2/default-bridge/proto"
ClientIdV1 = "nim-waku v1 node"
DefaultTTL = 5'u32
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
#########
# Types #
@ -36,11 +38,24 @@ type
WakuBridge* = ref object of RootObj
nodev1*: EthereumNode
nodev2*: WakuNode
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
###################
# Helper funtions #
###################
proc containsOrAdd(sequence: var seq[hashes.Hash], hash: hashes.Hash): bool =
if sequence.contains(hash):
return true
if sequence.len >= DeduplQSize:
trace "Deduplication queue full. Removing oldest item."
sequence.delete 0, 0 # Remove first item in queue
sequence.add(hash)
return false
func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
@ -48,17 +63,35 @@ func toWakuMessage(env: Envelope): WakuMessage =
version: 1)
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
let msg = env.toWakuMessage()
debug "message converted to V2", msg=msg
if bridge.seen.containsOrAdd(msg.encode().buffer.hash()):
# This is a duplicate message. Return
trace "Already seen. Dropping.", msg=msg
waku_bridge_dropped.inc(labelValues = ["duplicate"])
return
waku_bridge_transfers.inc(labelValues = ["v1_to_v2"])
await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
await bridge.nodev2.publish(DefaultBridgeTopic, msg)
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
debug "sending message to V1", msg=msg
if bridge.seen.containsOrAdd(msg.encode().buffer.hash()):
# This is a duplicate message. Return
trace "Already seen. Dropping.", msg=msg
waku_bridge_dropped.inc(labelValues = ["duplicate"])
return
waku_bridge_transfers.inc(labelValues = ["v2_to_v1"])
# @TODO: use namespacing to map v2 contentTopics to v1 topics
let v1TopicSeq = msg.contentTopic.toBytes()[0..3]
discard bridge.nodev1.postMessage(ttl = defaultTTL,
discard bridge.nodev1.postMessage(ttl = DefaultTTL,
topic = toArray(4, v1TopicSeq),
payload = msg.payload)
@ -79,7 +112,7 @@ proc new*(T: type WakuBridge,
# Setup Waku v1 node
var
nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address,
networkId = NetworkId(1), chain = nil, clientId = clientIdV1,
networkId = NetworkId(1), chain = nil, clientId = ClientIdV1,
addAllCapabilities = false, rng = rng)
nodev1.addCapability Waku # Always enable Waku protocol
@ -138,7 +171,7 @@ proc start*(bridge: WakuBridge) {.async.} =
trace "Bridging message from V2 to V1", msg=msg[]
bridge.toWakuV1(msg[])
bridge.nodev2.subscribe(defaultBridgeTopic, relayHandler)
bridge.nodev2.subscribe(DefaultBridgeTopic, relayHandler)
proc stop*(bridge: WakuBridge) {.async.} =
await bridge.nodev2.stop()
@ -182,7 +215,7 @@ when isMainModule:
# Load address configuration
let
(nodev1ExtIp, _, _) = setupNat(conf.nat, clientIdV1,
(nodev1ExtIp, _, _) = setupNat(conf.nat, ClientIdV1,
Port(conf.devp2pTcpPort + conf.portsShift),
Port(conf.udpPort + conf.portsShift))
# TODO: EthereumNode should have a better split of binding address and