fix: bridge loses connection to v1 (#1020)

* fix: bridge loses connection to v1

* Remove magic numbers in checks
This commit is contained in:
Hanno Cornelius 2022-06-27 14:16:15 +02:00 committed by GitHub
parent 6f37c7eba4
commit 9be64a55b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 143 additions and 13 deletions

View File

@ -1,7 +1,7 @@
{.used.}
import
std/strutils,
std/[sequtils, strutils, tables],
testutils/unittests,
chronicles, chronos, stew/shims/net as stewNet, stew/[byteutils, objects],
libp2p/crypto/crypto,
@ -37,7 +37,7 @@ procSuite "WakuBridge":
nodev2Key = crypto.PrivateKey.random(Secp256k1, rng[])[]
bridge = WakuBridge.new(
nodev1Key= nodev1Key,
nodev1Address = localAddress(30303),
nodev1Address = localAddress(30302),
powRequirement = 0.002,
rng = rng,
nodev2Key = nodev2Key,
@ -183,3 +183,60 @@ procSuite "WakuBridge":
bridge.nodeV1.resetMessageQueue()
v1Node.resetMessageQueue()
waitFor allFutures([bridge.stop(), v2Node.stop()])
asyncTest "Bridge manages its v1 connections":
# Given
let
# Waku v1 node
v1NodePool = @[setupTestNode(rng, Waku),
setupTestNode(rng, Waku),
setupTestNode(rng, Waku)]
targetV1Peers = v1NodePool.len() - 1
# Bridge
v1Bridge = WakuBridge.new(
nodev1Key= nodev1Key,
nodev1Address = localAddress(30303),
powRequirement = 0.002,
rng = rng,
nodev2Key = nodev2Key,
nodev2BindIp = ValidIpAddress.init("0.0.0.0"), nodev2BindPort= Port(60000),
nodev2PubsubTopic = DefaultBridgeTopic,
v1Pool = v1NodePool.mapIt(newNode(it.toEnode())),
targetV1Peers = targetV1Peers)
for node in v1NodePool:
node.startListening()
# When
waitFor v1Bridge.start()
await sleepAsync(2000.millis) # Give peers some time to connect
# Then
check:
v1Bridge.nodev1.peerPool.connectedNodes.len() == targetV1Peers
# When
let connected = v1Bridge.nodev1.peerPool.connectedNodes
for peer in connected.values():
waitFor peer.disconnect(SubprotocolReason)
# Then
check:
v1Bridge.nodev1.peerPool.connectedNodes.len() == 0
# When
discard v1Bridge.maintenanceLoop() # Forces one more run of the maintenance loop
await sleepAsync(2000.millis) # Give peers some time to connect
# Then
check:
v1Bridge.nodev1.peerPool.connectedNodes.len() == targetV1Peers
# Cleanup
v1Bridge.nodev1.resetMessageQueue()
for node in v1NodePool:
node.resetMessageQueue()
waitFor v1Bridge.stop()

View File

@ -8,7 +8,8 @@ import
stew/shims/net as stewNet, json_rpc/rpcserver,
# Waku v1 imports
eth/[keys, p2p], eth/common/utils,
eth/p2p/enode,
eth/p2p/[enode, peer_pool],
eth/p2p/discoveryv5/random2,
../v1/protocol/waku_protocol,
# Waku v2 imports
libp2p/crypto/crypto,
@ -35,7 +36,8 @@ const
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
ContentTopicApplication = "waku"
ContentTopicAppVersion = "1"
MaintenancePeriod = 1.minutes
TargetV1Peers = 4 # Target number of v1 connections to maintain. Could be made configurable in future.
#########
# Types #
@ -47,6 +49,10 @@ type
nodev2*: WakuNode
nodev2PubsubTopic: wakunode2.Topic # Pubsub topic to bridge to/from
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
rng: ref BrHmacDrbgContext
v1Pool: seq[Node] # Pool of v1 nodes for possible connections
targetV1Peers: int # Target number of v1 peers to maintain
started: bool # Indicates that bridge is running
###################
# Helper funtions #
@ -150,6 +156,57 @@ proc toWakuV1(bridge: WakuBridge, msg: WakuMessage) {.gcsafe, raises: [Defect, L
topic = toV1Topic(msg.contentTopic),
payload = msg.payload)
proc connectToV1(bridge: WakuBridge, target: int) =
## Uses the initialised peer pool to attempt to connect
## to the set target number of v1 peers at random.
# First filter the peers in the pool that we're not yet connected to
var candidates = bridge.v1Pool.filterIt(it notin bridge.nodev1.peerPool.connectedNodes)
debug "connecting to v1", candidates=candidates.len(), target=target
# Now attempt connection to random peers from candidate list until we reach target
let maxAttempts = min(target, candidates.len())
trace "Attempting to connect to random peers from pool", target=maxAttempts
for i in 1..maxAttempts:
let
randIndex = rand(bridge.rng[], candidates.len() - 1)
randPeer = candidates[randIndex]
debug "Attempting to connect to random peer", randPeer
asyncSpawn bridge.nodev1.peerPool.connectToNode(randPeer)
candidates.delete(randIndex, randIndex)
if candidates.len() == 0:
# Stop when we've exhausted all candidates
break;
proc maintenanceLoop*(bridge: WakuBridge) {.async.} =
while bridge.started:
trace "running maintenance"
let
v1Connections = bridge.nodev1.peerPool.connectedNodes.len()
v2Connections = bridge.nodev2.switch.peerStore[AddressBook].len()
info "Bridge connectivity",
v1Peers=v1Connections,
v2Peers=v2Connections
# Replenish v1 connections if necessary
if v1Connections < bridge.targetV1Peers:
debug "Attempting to replenish v1 connections",
current=v1Connections,
target=bridge.targetV1Peers
bridge.connectToV1(bridge.targetV1Peers - v1Connections)
# TODO: we could do similar maintenance for v2 connections here
await sleepAsync(MaintenancePeriod)
##############
# Public API #
##############
@ -167,7 +224,9 @@ proc new*(T: type WakuBridge,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
nameResolver: NameResolver = nil,
# Bridge configuration
nodev2PubsubTopic: wakunode2.Topic): T
nodev2PubsubTopic: wakunode2.Topic,
v1Pool: seq[Node] = @[],
targetV1Peers = 0): T
{.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} =
# Setup Waku v1 node
@ -197,7 +256,12 @@ proc new*(T: type WakuBridge,
nodev2ExtIp, nodev2ExtPort,
nameResolver = nameResolver)
return WakuBridge(nodev1: nodev1, nodev2: nodev2, nodev2PubsubTopic: nodev2PubsubTopic)
return WakuBridge(nodev1: nodev1,
nodev2: nodev2,
nodev2PubsubTopic: nodev2PubsubTopic,
rng: rng,
v1Pool: v1Pool,
targetV1Peers: targetV1Peers)
proc start*(bridge: WakuBridge) {.async.} =
info "Starting WakuBridge"
@ -243,7 +307,11 @@ proc start*(bridge: WakuBridge) {.async.} =
bridge.nodev2.subscribe(bridge.nodev2PubsubTopic, relayHandler)
bridge.started = true
asyncSpawn bridge.maintenanceLoop()
proc stop*(bridge: WakuBridge) {.async.} =
bridge.started = false
await bridge.nodev2.stop()
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
@ -327,8 +395,17 @@ when isMainModule:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
dnsReslvr = DnsResolver.new(nameServers)
# Initialise bridge with a candidate pool of v1 nodes to connect to
var v1PoolStrs: seq[string]
if conf.staticnodesV1.len > 0: v1PoolStrs = conf.staticnodesV1
elif conf.fleetV1 == prod: v1PoolStrs = @WhisperNodes
elif conf.fleetV1 == staging: v1PoolStrs = @WhisperNodesStaging
elif conf.fleetV1 == test: v1PoolStrs = @WhisperNodesTest
let
v1Pool = v1PoolStrs.mapIt(newNode(ENode.fromString(it).expect("correct node addrs")))
bridge = WakuBridge.new(nodev1Key = conf.nodekeyV1,
nodev1Address = nodev1Address,
powRequirement = conf.wakuV1Pow,
@ -339,17 +416,13 @@ when isMainModule:
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort,
nameResolver = dnsReslvr,
nodev2PubsubTopic = conf.bridgePubsubTopic)
nodev2PubsubTopic = conf.bridgePubsubTopic,
v1Pool = v1Pool,
targetV1Peers = min(v1Pool.len(), TargetV1Peers))
waitFor bridge.start()
# Now load rest of config
# Optionally direct connect nodev1 with a set of nodes
if conf.staticnodesV1.len > 0: connectToNodes(bridge.nodev1, conf.staticnodesV1)
elif conf.fleetV1 == prod: connectToNodes(bridge.nodev1, WhisperNodes)
elif conf.fleetV1 == staging: connectToNodes(bridge.nodev1, WhisperNodesStaging)
elif conf.fleetV1 == test: connectToNodes(bridge.nodev1, WhisperNodesTest)
# Mount configured Waku v2 protocols
mountLibp2pPing(bridge.nodev2)