bug: connect instead dial relay peers (#1622)

This commit is contained in:
Alvaro Revuelta 2023-03-28 13:29:48 +02:00 committed by GitHub
parent c42ac16fe6
commit 85f33a8efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 123 deletions

View File

@ -33,20 +33,32 @@ import
./testlib/waku2
procSuite "Peer Manager":
asyncTest "Peer dialing works":
asyncTest "connectRelay() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
check:
connOk == true
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected
asyncTest "dialPeer() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
# Dial node2 from node1
let conn = (await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)).get()
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
# Check connection
check:
conn.activity
conn.peerId == nodes[1].peerInfo.peerId
conn.isSome()
conn.get.activity
conn.get.peerId == nodes[1].peerInfo.peerId
# Check that node2 is being managed in node1
check:
@ -58,23 +70,25 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Dialing fails gracefully":
# Create 2 nodes
asyncTest "dialPeer() fails gracefully":
# Create 2 nodes and start them
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await nodes[0].start()
await nodes[0].mountRelay()
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
# Purposefully don't start node2
# Dial node2 from node1
let connOpt = await nodes[1].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
# Check connection failed gracefully
# Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
check:
connOpt.isNone()
conn1.isNone()
await nodes[0].stop()
# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
check:
conn2.isNone()
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Adding, selecting and filtering peers work":
let
@ -120,9 +134,7 @@ procSuite "Peer Manager":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await nodes[0].start()
# Do not start node2
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Test default connectedness for new peers
@ -131,16 +143,17 @@ procSuite "Peer Manager":
# No information about node2's connectedness
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
# Purposefully don't start node2
# Attempt dialing node2 from node1
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
# Failed connection
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
require:
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
check:
# Cannot connect to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
# Successful connection
await nodes[1].start()
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
require:
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true
check:
# Currently connected to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
@ -157,28 +170,31 @@ procSuite "Peer Manager":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await nodes[0].start()
await nodes[0].mountRelay()
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
nodes[0].peerManager.addPeer(nonExistentPeer)
# Set a low backoff to speed up test: 2, 4, 8, 16
nodes[0].peerManager.initialBackoffInSec = 2
nodes[0].peerManager.backoffFactor = 2
# node2 is not started, so dialing will fail
let conn1 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
# try to connect to peer that doesnt exist
let conn1Ok = await nodes[0].peerManager.connectRelay(nonExistentPeer)
check:
# Cannot connect to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].peerInfo.peerId] == CannotConnect
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 1
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
nodes[0].peerManager.peerStore[ConnectionBook][nonExistentPeer.peerId] == CannotConnect
nodes[0].peerManager.peerStore[NumberFailedConnBook][nonExistentPeer.peerId] == 1
# And the connection failed
conn1.isNone() == true
# Connection attempt failed
conn1Ok == false
# Right after failing there is a backoff period
nodes[0].peerManager.peerStore.canBeConnected(
nodes[1].peerInfo.peerId,
nonExistentPeer.peerId,
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == false
@ -192,13 +208,11 @@ procSuite "Peer Manager":
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == true
await nodes[1].start()
await nodes[1].mountRelay()
# Now we can connect and failed count is reset
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
# After a successful connection, the number of failed connections is reset
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
let conn2Ok = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
check:
conn2.isNone() == false
conn2Ok == true
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0
await allFutures(nodes.mapIt(it.stop()))
@ -217,7 +231,8 @@ procSuite "Peer Manager":
await node1.mountRelay()
await node2.mountRelay()
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
@ -265,7 +280,8 @@ procSuite "Peer Manager":
await node2.mountRelay()
node2.wakuRelay.codec = betaCodec
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
@ -353,9 +369,10 @@ procSuite "Peer Manager":
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# all nodes connect to peer 0
discard await nodes[1].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
discard await nodes[2].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
discard await nodes[3].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
require:
(await nodes[1].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true
check:
# Peerstore track all three peers

View File

@ -138,9 +138,9 @@ suite "WakuNode":
await node3.start()
await node3.mountRelay()
discard await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
discard await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(3.seconds)
discard await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
discard await node1.peerManager.connectRelay(node3.switch.peerInfo.toRemotePeerInfo())
check:
# Verify that only the first connection succeeded

View File

@ -116,9 +116,9 @@ suite "Waku Relay":
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
require:
conn.isSome()
connOk == true
## Given
let networkTopic = "test-network1"
@ -174,9 +174,9 @@ suite "Waku Relay":
await allFutures(srcSwitch.start(), dstSwitch.start())
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
require:
conn.isSome()
connOk == true
## Given
let networkTopic = "test-network1"

View File

@ -221,8 +221,9 @@ suite "WakuNode - Relay":
await allFutures(nodes.mapIt(it.mountRelay()))
# Connect nodes
let conn = await nodes[0].peerManager.dialPeer(nodes[1].switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
require conn.isSome
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
require:
connOk == true
# Node 1 subscribes to topic
nodes[1].subscribe(DefaultPubsubTopic)

View File

@ -45,8 +45,8 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
debug "post_waku_v2_admin_v1_peers"
for i, peer in peers:
let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc")
if conn.isNone():
let connOk = await node.peerManager.connectRelay(parseRemotePeerInfo(peer), source="rpc")
if not connOk:
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
return true

View File

@ -87,29 +87,108 @@ proc insertOrReplace(ps: PeerStorage,
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])
proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
# Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
return
# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
# Peer already managed
return
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
# Connects to a given node. Note that this function uses `connect` and
# does not provide a protocol. Streams for relay (gossipsub) are created
# automatically without the needing to dial.
proc connectRelay*(pm: PeerManager,
peer: RemotePeerInfo,
dialTimeout = DefaultDialTimeout,
source = "api"): Future[bool] {.async.} =
let peerId = peer.peerId
# Do not attempt to dial self
if peerId == pm.switch.peerInfo.peerId:
return none(Connection)
return false
if not pm.peerStore.hasPeer(peerId, WakuRelayCodec):
pm.addPeer(peer)
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
debug "Dialing peer", wireAddr = addrs, peerId = peerId, failedAttempts=failedAttempts
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
var deadline = sleepAsync(dialTimeout)
var workfut = pm.switch.connect(peerId, peer.addrs)
var reasonFailed = ""
try:
await workfut or deadline
if workfut.finished():
if not deadline.finished():
deadline.cancel()
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return true
else:
reasonFailed = "timed out"
await cancelAndWait(workfut)
except CatchableError as exc:
reasonFailed = "remote peer failed"
# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
debug "Connecting relay peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])
return false
# Dialing should be used for just protocols that require a stream to write and read
# This shall not be used to dial Relay protocols, since that would create
# unneccesary unused streams.
proc dialPeer(pm: PeerManager,
peerId: PeerID,
addrs: seq[MultiAddress],
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"): Future[Option[Connection]] {.async.} =
if peerId == pm.switch.peerInfo.peerId:
error "could not dial self"
return none(Connection)
if proto == WakuRelayCodec:
error "dial shall not be used to connect to relays"
return none(Connection)
debug "Dialing peer", wireAddr=addrs, peerId=peerId, proto=proto
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
var reasonFailed = ""
try:
if (await dialFut.withTimeout(dialTimeout)):
waku_peers_dials.inc(labelValues = ["successful"])
# TODO: This will be populated from the peerstore info when ready
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return some(dialFut.read())
else:
reasonFailed = "timeout"
@ -117,20 +196,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
except CatchableError as exc:
reasonFailed = "failed"
# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
debug "Dialing peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])
# Update storage
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
return none(Connection)
@ -255,31 +321,6 @@ proc new*(T: type PeerManager,
# Manager interface #
#####################
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
# Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
return
# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
# Peer already managed
return
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
@ -303,16 +344,16 @@ proc reconnectPeers*(pm: PeerManager,
debug "Reconnecting peers", proto=proto
# Proto is not persisted, we need to iterate over all peers.
for storedInfo in pm.peerStore.peers(protocolMatcher(proto)):
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if storedInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
if peerInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable or non-existing peer", peerId=peerInfo.peerId
continue
# Respect optional backoff period where applicable.
let
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
@ -320,12 +361,11 @@ proc reconnectPeers*(pm: PeerManager,
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
debug "Backing off before reconnect...", peerId=peerInfo.peerId, backoffTime=backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
trace "Reconnecting to peer", peerId= $storedInfo.peerId
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
discard await pm.connectRelay(peerInfo)
####################
# Dialer interface #
@ -362,7 +402,6 @@ proc dialPeer*(pm: PeerManager,
proc connectToNodes*(pm: PeerManager,
nodes: seq[string]|seq[RemotePeerInfo],
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api") {.async.} =
if nodes.len == 0:
@ -370,14 +409,14 @@ proc connectToNodes*(pm: PeerManager,
info "Dialing multiple peers", numOfPeers = nodes.len
var futConns: seq[Future[Option[Connection]]]
var futConns: seq[Future[bool]]
for node in nodes:
let node = when node is string: parseRemotePeerInfo(node)
else: node
futConns.add(pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source))
futConns.add(pm.connectRelay(node))
await allFutures(futConns)
let successfulConns = futConns.mapIt(it.read()).countIt(it.isSome)
let successfulConns = futConns.mapIt(it.read()).countIt(true)
info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len
@ -414,7 +453,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len

View File

@ -380,8 +380,8 @@ proc info*(node: WakuNode): WakuInfo =
proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE This is dialing on WakuRelay protocol specifically
await peer_manager.connectToNodes(node.peerManager, nodes, WakuRelayCodec, source=source)
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await connectToNodes(node.peerManager, nodes, source=source)
## Waku relay