From 9be64a55b63cffed7460eb784121e60dd958cb68 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 27 Jun 2022 14:16:15 +0200 Subject: [PATCH] fix: bridge loses connection to v1 (#1020) * fix: bridge loses connection to v1 * Remove magic numbers in checks --- tests/v2/test_waku_bridge.nim | 61 +++++++++++++++++++++- waku/common/wakubridge.nim | 95 +++++++++++++++++++++++++++++++---- 2 files changed, 143 insertions(+), 13 deletions(-) diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index 73f7fa2e3..6f33e8c0e 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -1,7 +1,7 @@ {.used.} import - std/strutils, + std/[sequtils, strutils, tables], testutils/unittests, chronicles, chronos, stew/shims/net as stewNet, stew/[byteutils, objects], libp2p/crypto/crypto, @@ -37,7 +37,7 @@ procSuite "WakuBridge": nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[] bridge = WakuBridge.new( nodev1Key= nodev1Key, - nodev1Address = localAddress(30303), + nodev1Address = localAddress(30302), powRequirement = 0.002, rng = rng, nodev2Key = nodev2Key, @@ -183,3 +183,60 @@ procSuite "WakuBridge": bridge.nodeV1.resetMessageQueue() v1Node.resetMessageQueue() waitFor allFutures([bridge.stop(), v2Node.stop()]) + + asyncTest "Bridge manages its v1 connections": + # Given + let + # Waku v1 node + v1NodePool = @[setupTestNode(rng, Waku), + setupTestNode(rng, Waku), + setupTestNode(rng, Waku)] + targetV1Peers = v1NodePool.len() - 1 + + # Bridge + v1Bridge = WakuBridge.new( + nodev1Key= nodev1Key, + nodev1Address = localAddress(30303), + powRequirement = 0.002, + rng = rng, + nodev2Key = nodev2Key, + nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000), + nodev2PubsubTopic = DefaultBridgeTopic, + v1Pool = v1NodePool.mapIt(newNode(it.toEnode())), + targetV1Peers = targetV1Peers) + + for node in v1NodePool: + node.startListening() + + # When + waitFor v1Bridge.start() + await sleepAsync(2000.millis) # Give peers some time to connect + + # Then + check: + v1Bridge.nodev1.peerPool.connectedNodes.len() == targetV1Peers + + # When + let connected = v1Bridge.nodev1.peerPool.connectedNodes + for peer in connected.values(): + waitFor peer.disconnect(SubprotocolReason) + + # Then + check: + v1Bridge.nodev1.peerPool.connectedNodes.len() == 0 + + # When + discard v1Bridge.maintenanceLoop() # Forces one more run of the maintenance loop + await sleepAsync(2000.millis) # Give peers some time to connect + + # Then + check: + v1Bridge.nodev1.peerPool.connectedNodes.len() == targetV1Peers + + # Cleanup + v1Bridge.nodev1.resetMessageQueue() + + for node in v1NodePool: + node.resetMessageQueue() + + waitFor v1Bridge.stop() diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index 955906738..821d95532 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -8,7 +8,8 @@ import stew/shims/net as stewNet, json_rpc/rpcserver, # Waku v1 imports eth/[keys, p2p], eth/common/utils, - eth/p2p/enode, + eth/p2p/[enode, peer_pool], + eth/p2p/discoveryv5/random2, ../v1/protocol/waku_protocol, # Waku v2 imports libp2p/crypto/crypto, @@ -35,7 +36,8 @@ const DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue ContentTopicApplication = "waku" ContentTopicAppVersion = "1" - + MaintenancePeriod = 1.minutes + TargetV1Peers = 4 # Target number of v1 connections to maintain. Could be made configurable in future. ######### # Types # @@ -47,6 +49,10 @@ type nodev2*: WakuNode nodev2PubsubTopic: wakunode2.Topic # Pubsub topic to bridge to/from seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. + rng: ref BrHmacDrbgContext + v1Pool: seq[Node] # Pool of v1 nodes for possible connections + targetV1Peers: int # Target number of v1 peers to maintain + started: bool # Indicates that bridge is running ################### # Helper funtions # @@ -150,6 +156,57 @@ proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [Defect, L topic = toV1Topic(msg.contentTopic), payload = msg.payload) +proc connectToV1(bridge: WakuBridge, target: int) = + ## Uses the initialised peer pool to attempt to connect + ## to the set target number of v1 peers at random. + + # First filter the peers in the pool that we're not yet connected to + var candidates = bridge.v1Pool.filterIt(it notin bridge.nodev1.peerPool.connectedNodes) + + debug "connecting to v1", candidates=candidates.len(), target=target + + # Now attempt connection to random peers from candidate list until we reach target + let maxAttempts = min(target, candidates.len()) + + trace "Attempting to connect to random peers from pool", target=maxAttempts + for i in 1..maxAttempts: + let + randIndex = rand(bridge.rng[], candidates.len() - 1) + randPeer = candidates[randIndex] + + debug "Attempting to connect to random peer", randPeer + asyncSpawn bridge.nodev1.peerPool.connectToNode(randPeer) + + candidates.delete(randIndex, randIndex) + if candidates.len() == 0: + # Stop when we've exhausted all candidates + break; + +proc maintenanceLoop*(bridge: WakuBridge) {.async.} = + while bridge.started: + trace "running maintenance" + + let + v1Connections = bridge.nodev1.peerPool.connectedNodes.len() + v2Connections = bridge.nodev2.switch.peerStore[AddressBook].len() + + info "Bridge connectivity", + v1Peers=v1Connections, + v2Peers=v2Connections + + # Replenish v1 connections if necessary + + if v1Connections < bridge.targetV1Peers: + debug "Attempting to replenish v1 connections", + current=v1Connections, + target=bridge.targetV1Peers + + bridge.connectToV1(bridge.targetV1Peers - v1Connections) + + # TODO: we could do similar maintenance for v2 connections here + + await sleepAsync(MaintenancePeriod) + ############## # Public API # ############## @@ -167,7 +224,9 @@ proc new*(T: type WakuBridge, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nameResolver: NameResolver = nil, # Bridge configuration - nodev2PubsubTopic: wakunode2.Topic): T + nodev2PubsubTopic: wakunode2.Topic, + v1Pool: seq[Node] = @[], + targetV1Peers = 0): T {.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} = # Setup Waku v1 node @@ -197,7 +256,12 @@ proc new*(T: type WakuBridge, nodev2ExtIp, nodev2ExtPort, nameResolver = nameResolver) - return WakuBridge(nodev1: nodev1, nodev2: nodev2, nodev2PubsubTopic: nodev2PubsubTopic) + return WakuBridge(nodev1: nodev1, + nodev2: nodev2, + nodev2PubsubTopic: nodev2PubsubTopic, + rng: rng, + v1Pool: v1Pool, + targetV1Peers: targetV1Peers) proc start*(bridge: WakuBridge) {.async.} = info "Starting WakuBridge" @@ -243,7 +307,11 @@ proc start*(bridge: WakuBridge) {.async.} = bridge.nodev2.subscribe(bridge.nodev2PubsubTopic, relayHandler) + bridge.started = true + asyncSpawn bridge.maintenanceLoop() + proc stop*(bridge: WakuBridge) {.async.} = + bridge.started = false await bridge.nodev2.stop() {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError @@ -327,8 +395,17 @@ when isMainModule: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 dnsReslvr = DnsResolver.new(nameServers) + + # Initialise bridge with a candidate pool of v1 nodes to connect to + var v1PoolStrs: seq[string] + + if conf.staticnodesV1.len > 0: v1PoolStrs = conf.staticnodesV1 + elif conf.fleetV1 == prod: v1PoolStrs = @WhisperNodes + elif conf.fleetV1 == staging: v1PoolStrs = @WhisperNodesStaging + elif conf.fleetV1 == test: v1PoolStrs = @WhisperNodesTest let + v1Pool = v1PoolStrs.mapIt(newNode(ENode.fromString(it).expect("correct node addrs"))) bridge = WakuBridge.new(nodev1Key = conf.nodekeyV1, nodev1Address = nodev1Address, powRequirement = conf.wakuV1Pow, @@ -339,17 +416,13 @@ when isMainModule: nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort, nameResolver = dnsReslvr, - nodev2PubsubTopic = conf.bridgePubsubTopic) + nodev2PubsubTopic = conf.bridgePubsubTopic, + v1Pool = v1Pool, + targetV1Peers = min(v1Pool.len(), TargetV1Peers)) waitFor bridge.start() # Now load rest of config - # Optionally direct connect nodev1 with a set of nodes - - if conf.staticnodesV1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesV1) - elif conf.fleetV1 == prod: connectToNodes(bridge.nodev1, WhisperNodes) - elif conf.fleetV1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging) - elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest) # Mount configured Waku v2 protocols mountLibp2pPing(bridge.nodev2)