Add WakuBridge and test (#426)

* Add WakuBridge and test

* Fix test
This commit is contained in:
Hanno Cornelius 2021-03-22 11:44:45 +02:00 committed by GitHub
parent 4ae1cd4737
commit 62b824c387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 93 deletions

View File

@ -12,7 +12,8 @@ import
./v2/test_jsonrpc_waku,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
./v2/test_waku_rln_relay
./v2/test_waku_rln_relay,
./v2/test_waku_bridge
# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module

View File

@ -0,0 +1,89 @@
{.used.}
import
std/unittest,
chronicles, chronos, stew/shims/net as stewNet, stew/byteutils,
libp2p/crypto/crypto,
libp2p/crypto/secp,
libp2p/peerid,
libp2p/multiaddress,
libp2p/switch,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
eth/p2p,
eth/keys,
../../waku/common/wakubridge,
../../waku/v1/protocol/waku_protocol,
../../waku/v2/protocol/[waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/node/wakunode2,
../test_helpers
procSuite "WakuBridge":
let rng = keys.newRng()
asyncTest "Messages are bridged between Waku v1 and Waku v2":
let
# Bridge
nodev1Key = keys.KeyPair.random(rng[])
nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[]
bridge = WakuBridge.new(
nodev1Key= nodev1Key,
nodev1Address = localAddress(30303),
powRequirement = 0.002,
rng = rng,
nodev2Key = nodev2Key,
nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000))
# Waku v1 node
v1Node = setupTestNode(rng, Waku)
# Waku v2 node
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
topic = [byte 0x00, 0, 0, byte 0x01]
contentTopic = ContentTopic(1)
payloadV1 = "hello from V1".toBytes()
payloadV2 = "hello from V2".toBytes()
message = WakuMessage(payload: payloadV2, contentTopic: contentTopic)
await bridge.start()
await v2Node.start()
v2Node.mountRelay(@[defaultBridgeTopic])
discard await v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
await v2Node.connectToNodes(@[bridge.nodev2.peerInfo])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk() and msg.value().version == 1:
completionFut.complete(true)
v2Node.subscribe(defaultBridgeTopic, relayHandler)
await sleepAsync(2000.millis)
# Test bridging from V2 to V1
await v2Node.publish(defaultBridgeTopic, message)
await sleepAsync(2000.millis)
check:
# v1Node received message published by v2Node
v1Node.protocolState(Waku).queue.items.len == 1
# Test bridging from V1 to V2
check:
v1Node.postMessage(ttl = 5,
topic = topic,
payload = payloadV1) == true
# v2Node received payload published by v1Node
await completionFut.withTimeout(5.seconds)
await bridge.stop()

View File

@ -215,6 +215,8 @@ procSuite "Waku rln relay":
# start rln-relay
await node.mountRlnRelay(ethClientAddress = some(EthClient), ethAccountAddress = some(ethAccountAddress), membershipContractAddress = some(membershipContractAddress))
await node.stop()
suite "Waku rln relay":
test "Keygen Nim Wrappers":
var

View File

@ -23,63 +23,91 @@ import
# Common cli config
./config_bridge
logScope:
topics = "wakubridge"
##################
# Default values #
##################
const
defaultBridgeTopic* = "/waku/2/default-bridge/proto"
clientIdV1 = "nim-waku v1 node"
defaultBridgeTopic = "/waku/2/default-bridge/proto"
defaultTTL = 5'u32
proc toWakuMessage(env: Envelope): WakuMessage =
#########
# Types #
#########
type
WakuBridge* = ref object of RootObj
nodev1*: EthereumNode
nodev2*: WakuNode
###################
# Helper funtions #
###################
func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
contentTopic: ContentTopic(uint32.fromBytes(env.topic)),
version: 1)
proc toWakuV2(env: Envelope, nodev2: WakuNode) {.async.} =
await nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
await bridge.nodev2.publish(defaultBridgeTopic, env.toWakuMessage())
proc toWakuV1(msg: WakuMessage, nodev1: EthereumNode) {.gcsafe.} =
discard nodev1.postMessage(ttl = defaultTTL,
topic = msg.contentTopic.toBytes(),
payload = msg.payload)
proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe.} =
discard bridge.nodev1.postMessage(ttl = defaultTTL,
topic = msg.contentTopic.toBytes(),
payload = msg.payload)
proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext):
EthereumNode =
let
(ipExt, _, _) = setupNat(config.nat, clientIdV1,
Port(config.devp2pTcpPort + config.portsShift),
Port(config.udpPort + config.portsShift))
# TODO: EthereumNode should have a better split of binding address and
# external address. Also, can't have different ports as it stands now.
address = if ipExt.isNone():
Address(ip: parseIpAddress("0.0.0.0"),
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
else:
Address(ip: ipExt.get(),
tcpPort: Port(config.devp2pTcpPort + config.portsShift),
udpPort: Port(config.udpPort + config.portsShift))
##############
# Public API #
##############
proc new*(T: type WakuBridge,
# NodeV1 initialisation
nodev1Key: keys.KeyPair,
nodev1Address: Address,
powRequirement = 0.002,
rng: ref BrHmacDrbgContext,
# NodeV2 initialisation
nodev2Key: crypto.PrivateKey,
nodev2BindIp: ValidIpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port]()): T =
# Set-up node
var node = newEthereumNode(config.nodekeyv1, address, NetworkId(1), nil, clientIdV1,
addAllCapabilities = false, rng = rng)
node.addCapability Waku # Always enable Waku protocol
# Set up the Waku configuration.
# Setup Waku v1 node
var
nodev1 = newEthereumNode(keys = nodev1Key, address = nodev1Address,
networkId = NetworkId(1), chain = nil, clientId = clientIdV1,
addAllCapabilities = false, rng = rng)
nodev1.addCapability Waku # Always enable Waku protocol
# Setup the Waku configuration.
# This node is being set up as a bridge so it gets configured as a node with
# a full bloom filter so that it will receive and forward all messages.
# TODO: What is the PoW setting now?
let wakuConfig = WakuConfig(powRequirement: config.wakuPow,
bloom: some(fullBloom()), isLightNode: false,
maxMsgSize: waku_protocol.defaultMaxMsgSize,
topics: none(seq[waku_protocol.Topic]))
node.configureWaku(wakuConfig)
let wakuConfig = WakuConfig(powRequirement: powRequirement,
bloom: some(fullBloom()), isLightNode: false,
maxMsgSize: waku_protocol.defaultMaxMsgSize,
topics: none(seq[waku_protocol.Topic]))
nodev1.configureWaku(wakuConfig)
# Optionally direct connect with a set of nodes
if config.staticnodesv1.len > 0: connectToNodes(node, config.staticnodesv1)
elif config.fleetv1 == prod: connectToNodes(node, WhisperNodes)
elif config.fleetv1 == staging: connectToNodes(node, WhisperNodesStaging)
elif config.fleetv1 == test: connectToNodes(node, WhisperNodesTest)
# Setup Waku v2 node
let
nodev2 = WakuNode.init(nodev2Key,
nodev2BindIp, nodev2BindPort,
nodev2ExtIp, nodev2ExtPort)
return WakuBridge(nodev1: nodev1, nodev2: nodev2)
let connectedFut = node.connectToNetwork(@[],
proc start*(bridge: WakuBridge) {.async.} =
info "Starting WakuBridge"
debug "Start listening on Waku v1"
# Start listening on Waku v1 node
let connectedFut = bridge.nodev1.connectToNetwork(@[],
true, # Always enable listening
false # Disable discovery (only discovery v4 is currently supported)
)
@ -88,38 +116,32 @@ proc startWakuV1(config: WakuNodeConf, rng: ref BrHmacDrbgContext):
if connectedFut.failed:
fatal "connectToNetwork failed", msg = connectedFut.readError.msg
quit(1)
# Start Waku v2 node
debug "Start listening on Waku v2"
await bridge.nodev2.start()
bridge.nodev2.mountRelay() # Always mount relay for bridge
return node
# Bridging
# Handle messages on Waku v1 and bridge to Waku v2
proc handleEnvReceived(envelope: Envelope) {.gcsafe.} =
trace "Bridging envelope from V1 to V2", envelope=envelope
waitFor bridge.toWakuV2(envelope)
proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
let
(extIp, extTcpPort, _) = setupNat(config.nat, clientId,
Port(uint16(config.libp2pTcpPort) + config.portsShift),
Port(uint16(config.udpPort) + config.portsShift))
node = WakuNode.init(config.nodeKeyv2, config.listenAddress,
Port(uint16(config.libp2pTcpPort) + config.portsShift), extIp, extTcpPort)
bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived)
await node.start()
# Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
trace "Bridging message from V2 to V1", msg=msg[]
bridge.toWakuV1(msg[])
bridge.nodev2.subscribe(defaultBridgeTopic, relayHandler)
if config.store:
mountStore(node)
if config.filter:
mountFilter(node)
if config.relay:
mountRelay(node, config.topics.split(" "))
if config.staticnodesv2.len > 0:
waitFor connectToNodes(node, config.staticnodesv2)
if config.storenode != "":
setStorePeer(node, config.storenode)
if config.filternode != "":
setFilterPeer(node, config.filternode)
return node
proc stop*(bridge: WakuBridge) {.async.} =
await bridge.nodev2.stop()
when isMainModule:
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) =
@ -141,35 +163,63 @@ when isMainModule:
let
rng = keys.newRng()
let conf = WakuNodeConf.load()
conf = WakuNodeConf.load()
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
# Load address configuration
let
(nodev1ExtIp, _, _) = setupNat(conf.nat, clientIdV1,
Port(conf.devp2pTcpPort + conf.portsShift),
Port(conf.udpPort + conf.portsShift))
# TODO: EthereumNode should have a better split of binding address and
# external address. Also, can't have different ports as it stands now.
nodev1Address = if nodev1ExtIp.isNone():
Address(ip: parseIpAddress("0.0.0.0"),
tcpPort: Port(conf.devp2pTcpPort + conf.portsShift),
udpPort: Port(conf.udpPort + conf.portsShift))
else:
Address(ip: nodev1ExtIp.get(),
tcpPort: Port(conf.devp2pTcpPort + conf.portsShift),
udpPort: Port(conf.udpPort + conf.portsShift))
(nodev2ExtIp, nodev2ExtPort, _) = setupNat(conf.nat, clientId,
Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
let
bridge = WakuBridge.new(nodev1Key = conf.nodekeyv1,
nodev1Address = nodev1Address,
powRequirement = conf.wakuPow,
rng = rng,
nodev2Key = conf.nodeKeyv2,
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort)
var
nodev1 {.threadvar.}: EthereumNode
nodev2 {.threadvar.}: WakuNode
waitFor bridge.start()
nodev1 = startWakuV1(conf, rng)
nodev2 = waitFor startWakuV2(conf)
# Now load rest of config
# Optionally direct connect nodev1 with a set of nodes
if conf.staticnodesv1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesv1)
elif conf.fleetv1 == prod: connectToNodes(bridge.nodev1, WhisperNodes)
elif conf.fleetv1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging)
elif conf.fleetv1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest)
# Handle messages on Waku v1 and bridge to Waku v2
proc handleEnvReceived(envelope: Envelope) {.gcsafe.} =
debug "Bridging envelope from V1 to V2", envelope=envelope
waitFor envelope.toWakuV2(nodev2)
# Mount configured Waku v2 protocols
if conf.store:
mountStore(bridge.nodev2)
nodev1.registerEnvReceivedHandler(handleEnvReceived)
if conf.filter:
mountFilter(bridge.nodev2)
# Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
debug "Bridging message from V2 to V1", msg=msg[]
msg[].toWakuV1(nodev1)
nodev2.subscribe(defaultBridgeTopic, relayHandler)
if conf.staticnodesv2.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodesv2)
if conf.storenode != "":
setStorePeer(bridge.nodev2, conf.storenode)
if conf.filternode != "":
setFilterPeer(bridge.nodev2, conf.filternode)
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
@ -177,10 +227,10 @@ when isMainModule:
var rpcServer = newRpcHttpServer([ta])
# Waku v1 RPC
let keys = newKeyStorage()
setupWakuRPC(nodev1, keys, rpcServer, rng)
setupWakuSimRPC(nodev1, rpcServer)
setupWakuRPC(bridge.nodev1, keys, rpcServer, rng)
setupWakuSimRPC(bridge.nodev1, rpcServer)
# Waku v2 rpc
startV2Rpc(nodev2, rpcServer, conf)
startV2Rpc(bridge.nodev2, rpcServer, conf)
rpcServer.start()