feat(networking): add relay connectivity loop (#1482)

* feat(networking): add relay connectivity loop

* Add unit tests

* feat(networking): fix comments

* Fix lnsd comments
This commit is contained in:
Alvaro Revuelta 2023-01-18 15:17:56 +01:00 committed by GitHub
parent 605cf1c38c
commit fd1ec4a74a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 19 deletions

View File

@ -30,6 +30,7 @@ import
./v2/test_waku_filter, ./v2/test_waku_filter,
./v2/test_wakunode_filter, ./v2/test_wakunode_filter,
./v2/test_waku_peer_exchange, ./v2/test_waku_peer_exchange,
./v2/test_peer_store_extended,
./v2/test_waku_payload, ./v2/test_waku_payload,
./v2/test_waku_swap, ./v2/test_waku_swap,
./v2/test_utils_peers, ./v2/test_utils_peers,

View File

@ -44,6 +44,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p1] = Connected peerStore[ConnectionBook][p1] = Connected
peerStore[DisconnectBook][p1] = 0 peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5 peerStore[SourceBook][p1] = Discv5
peerStore[DirectionBook][p1] = Inbound
# Peer2: Connected # Peer2: Connected
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()] peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
@ -54,6 +55,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p2] = Connected peerStore[ConnectionBook][p2] = Connected
peerStore[DisconnectBook][p2] = 0 peerStore[DisconnectBook][p2] = 0
peerStore[SourceBook][p2] = Discv5 peerStore[SourceBook][p2] = Discv5
peerStore[DirectionBook][p2] = Inbound
# Peer3: Connected # Peer3: Connected
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
@ -64,6 +66,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p3] = Connected peerStore[ConnectionBook][p3] = Connected
peerStore[DisconnectBook][p3] = 0 peerStore[DisconnectBook][p3] = 0
peerStore[SourceBook][p3] = Discv5 peerStore[SourceBook][p3] = Discv5
peerStore[DirectionBook][p3] = Inbound
# Peer4: Added but never connected # Peer4: Added but never connected
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()] peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
@ -74,6 +77,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p4] = NotConnected peerStore[ConnectionBook][p4] = NotConnected
peerStore[DisconnectBook][p4] = 0 peerStore[DisconnectBook][p4] = 0
peerStore[SourceBook][p4] = Discv5 peerStore[SourceBook][p4] = Discv5
peerStore[DirectionBook][p4] = Inbound
# Peer5: Connecteed in the past # Peer5: Connecteed in the past
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()] peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
@ -84,6 +88,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p5] = CanConnect peerStore[ConnectionBook][p5] = CanConnect
peerStore[DisconnectBook][p5] = 1000 peerStore[DisconnectBook][p5] = 1000
peerStore[SourceBook][p5] = Discv5 peerStore[SourceBook][p5] = Discv5
peerStore[DirectionBook][p5] = Outbound
test "get() returns the correct StoredInfo for a given PeerId": test "get() returns the correct StoredInfo for a given PeerId":
# When # When
@ -113,7 +118,7 @@ suite "Extended nim-libp2p Peer Store":
storedInfoPeer6.protoVersion == "" storedInfoPeer6.protoVersion == ""
storedInfoPeer6.connectedness == NotConnected storedInfoPeer6.connectedness == NotConnected
storedInfoPeer6.disconnectTime == 0 storedInfoPeer6.disconnectTime == 0
storedInfoPeer6.origin == Unknown storedInfoPeer6.origin == UnknownOrigin
test "peers() returns all StoredInfo of the PeerStore": test "peers() returns all StoredInfo of the PeerStore":
# When # When
@ -254,3 +259,24 @@ suite "Extended nim-libp2p Peer Store":
swapPeer.isSome() swapPeer.isSome()
swapPeer.get().peerId == p5 swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
test "getPeersByDirection()":
# When
let inPeers = peerStore.getPeersByDirection(Inbound)
let outPeers = peerStore.getPeersByDirection(Outbound)
# Then
check:
inPeers.len == 4
outPeers.len == 1
test "getNotConnectedPeers()":
# When
let disconnedtedPeers = peerStore.getNotConnectedPeers()
# Then
check:
disconnedtedPeers.len == 2
disconnedtedPeers.anyIt(it.peerId == p4)
disconnedtedPeers.anyIt(it.peerId == p5)
not disconnedtedPeers.anyIt(it.connectedness == Connected)

View File

@ -5,13 +5,14 @@ else:
import import
std/[options, sets, sequtils, times], std/[options, sets, sequtils, times, random],
chronos, chronos,
chronicles, chronicles,
metrics, metrics,
libp2p/multistream libp2p/multistream
import import
../../utils/peers, ../../utils/peers,
../../waku/v2/protocol/waku_relay,
./peer_store/peer_storage, ./peer_store/peer_storage,
./waku_peer_store ./waku_peer_store
@ -36,6 +37,12 @@ const
# TODO: Make configurable # TODO: Make configurable
DefaultDialTimeout = chronos.seconds(10) DefaultDialTimeout = chronos.seconds(10)
# limit the amount of paralel dials
MaxParalelDials = 10
# delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)
#################### ####################
# Helper functions # # Helper functions #
#################### ####################
@ -179,19 +186,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not attempt to manage our unmanageable self # Do not attempt to manage our unmanageable self
return return
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
# ...known addresses
for multiaddr in remotePeerInfo.addrs:
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr
# ...public key # ...public key
var publicKey: PublicKey var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey) discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
# Peer already managed
return
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# nim-libp2p identify overrides this # TODO: Remove this once service slots is ready
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
# Add peer to storage. Entry will subsequently be updated with connectedness information # Add peer to storage. Entry will subsequently be updated with connectedness information
@ -284,3 +293,34 @@ proc connectToNodes*(pm: PeerManager,
# This issue was known to Dmitiry on nim-libp2p and may be resolvable # This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later. # later.
await sleepAsync(chronos.seconds(5)) await sleepAsync(chronos.seconds(5))
# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
while true:
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers
# TODO: Enforce a given in/out peers ratio
# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
await sleepAsync(ConnectivityLoopInterval)
continue
# TODO: Respect backoff before attempting to connect a relay peer
var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
shuffle(disconnectedPeers)
let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials)
info "Relay connectivity loop",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
availableDisconnectedPeers = disconnectedPeers.len
await pm.connectToNodes(disconnectedPeers[0..<numPeersToConnect], WakuRelayCodec)
await sleepAsync(ConnectivityLoopInterval)

View File

@ -142,4 +142,7 @@ proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
return none(RemotePeerInfo) return none(RemotePeerInfo)
proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] = proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] =
return peerStore.peers().filterIt(it.direction == direction) return peerStore.peers.filterIt(it.direction == direction)
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)

View File

@ -392,6 +392,9 @@ proc startRelay*(node: WakuNode) {.async.} =
protocolMatcher(WakuRelayCodec), protocolMatcher(WakuRelayCodec),
backoffPeriod) backoffPeriod)
# Maintain relay connections
asyncSpawn node.peerManager.relayConnectivityLoop()
# Start the WakuRelay protocol # Start the WakuRelay protocol
await node.wakuRelay.start() await node.wakuRelay.start()
@ -897,9 +900,6 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive) asyncSpawn node.keepaliveLoop(defaultKeepalive)
# TODO: Decouple discovery logic from connection logic
# A discovered peer goes to the PeerStore
# The PeerManager uses to PeerStore to dial peers
proc runDiscv5Loop(node: WakuNode) {.async.} = proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes ## Continuously add newly discovered nodes
## using Node Discovery v5 ## using Node Discovery v5
@ -920,12 +920,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
trace "Discovered peers", count=discoveredPeers.get().len() trace "Discovered peers", count=discoveredPeers.get().len()
let newPeers = discoveredPeers.get().filterIt( for peer in discoveredPeers.get():
not node.switch.isConnected(it.peerId)) # TODO: proto: WakuRelayCodec will be removed from add peer
node.peerManager.addPeer(peer, WakuRelayCodec)
if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await node.connectToNodes(newPeers, "discv5")
# Discovery `queryRandom` can have a synchronous fast path for example # Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop. # when no peers are in the routing table. Don't run it in continuous loop.