chore: test peer connection management (#3049)

* Make some useful consts public, add some utils.
* Implement various utilities.
* peer_manager reconnectPeers enhancements

---------

Co-authored-by: Álex Cabeza Romero <alex93cabeza@gmail.com>
This commit is contained in:
Ivan FB 2024-09-24 18:20:29 +02:00 committed by GitHub
parent 7c4a9717a6
commit 711e7db1e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 69 additions and 97 deletions

View File

@ -8,6 +8,7 @@ import
chronos,
# chronos/timer,
chronicles,
times,
libp2p/[peerstore, crypto/crypto, multiaddress]
from times import getTime, toUnix
@ -62,9 +63,9 @@ suite "Peer Manager":
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, listenIp, listenPort)
server = newTestWakuNode(serverKey, listenIp, Port(3000))
serverPeerStore = server.peerManager.peerStore
client = newTestWakuNode(clientKey, listenIp, listenPort)
client = newTestWakuNode(clientKey, listenIp, Port(3001))
clientPeerStore = client.peerManager.peerStore
await allFutures(server.start(), client.start())
@ -577,77 +578,49 @@ suite "Peer Manager":
Connectedness.CannotConnect
suite "Automatic Reconnection":
xasyncTest "Automatic Reconnection Implementation":
asyncTest "Automatic Reconnection Implementation":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
await client.disconnectNode(serverRemotePeerInfo)
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
# When triggering the reconnection
await client.peerManager.reconnectPeers(WakuRelayCodec)
# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
xasyncTest "Automatic Reconnection Implementation (With Backoff)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
waitFor allFutures(server.switch.stop(), client.switch.stop())
waitFor allFutures(server.switch.start(), client.switch.start())
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
## Now let's do the same but with backoff period
await client.disconnectNode(serverRemotePeerInfo)
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
# When triggering a reconnection with a backoff period
let
backoffPeriod = 10.seconds
halfBackoffPeriod = 5.seconds
let backoffPeriod = chronos.seconds(1)
let beforeReconnect = getTime().toUnixFloat()
await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
await sleepAsync(halfBackoffPeriod)
# If the backoff period is not over, then the peers should still be marked as CanConnect
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
# When waiting for the backoff period to be over
await sleepAsync(halfBackoffPeriod)
# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
xasyncTest "Automatic Reconnection Implementation (After client restart)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
# When triggering the reconnection, and some time for the reconnection to happen
waitFor allFutures(client.stop(), server.stop())
await allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT_LONG)
let reconnectDurationWithBackoffPeriod =
getTime().toUnixFloat() - beforeReconnect
# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
reconnectDurationWithBackoffPeriod > backoffPeriod.seconds.float
suite "Handling Connections on Different Networks":
# TODO: Implement after discv5 and peer manager's interaction is understood

View File

@ -1,4 +1,4 @@
import testutils/unittests
import testutils/unittests, chronos
template xsuite*(name: string, body: untyped) =
discard
@ -27,3 +27,11 @@ template xasyncTest*(name: string, body: untyped) =
template asyncTestx*(name: string, body: untyped) =
test name:
skip()
template waitActive*(condition: bool) =
for i in 0 ..< 200:
if condition:
break
await sleepAsync(10)
assert condition

View File

@ -226,6 +226,10 @@ proc connectRelay*(
return false
proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
let peerId = peer.peerId
await pm.switch.disconnect(peerId)
# Dialing should be used for just protocols that require a stream to write and read
# This shall not be used to dial Relay protocols, since that would create
# unneccesary unused streams.
@ -560,46 +564,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
pm.addPeer(remotePeerInfo)
proc reconnectPeers*(
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
trace "Reconnecting peers", proto = proto
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue
# Respect optional backoff period where applicable.
let
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second)
# Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime
# Consider time elapsed since last disconnect
trace "Respecting backoff",
backoff = backoff,
disconnectTime = disconnectTime,
currentTime = currentTime,
backoffTime = backoffTime
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
trace "Backing off before reconnect...",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
discard await pm.connectRelay(peerInfo)
####################
# Dialer interface #
####################
@ -647,7 +611,7 @@ proc connectToNodes*(
if nodes.len == 0:
return
info "Dialing multiple peers", numOfPeers = nodes.len
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes
var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
@ -685,6 +649,30 @@ proc connectToNodes*(
# later.
await sleepAsync(chronos.seconds(5))
proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto = proto
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue
if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
await pm.connectToNodes(@[peerInfo])
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.

View File

@ -198,6 +198,9 @@ proc connectToNodes*(
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source = source)
proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
await peer_manager.disconnectNode(node.peerManager, remotePeer)
## Waku Sync
proc mountWakuSync*(