mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
bug: connect instead dial relay peers (#1622)
This commit is contained in:
parent
bdf5cd1c54
commit
4e006e5ca1
@ -33,20 +33,32 @@ import
|
|||||||
./testlib/waku2
|
./testlib/waku2
|
||||||
|
|
||||||
procSuite "Peer Manager":
|
procSuite "Peer Manager":
|
||||||
asyncTest "Peer dialing works":
|
asyncTest "connectRelay() works":
|
||||||
|
# Create 2 nodes
|
||||||
|
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
|
||||||
|
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
|
||||||
|
check:
|
||||||
|
connOk == true
|
||||||
|
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
|
||||||
|
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected
|
||||||
|
|
||||||
|
asyncTest "dialPeer() works":
|
||||||
# Create 2 nodes
|
# Create 2 nodes
|
||||||
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
|
||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
|
||||||
# Dial node2 from node1
|
# Dial node2 from node1
|
||||||
let conn = (await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)).get()
|
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
|
|
||||||
# Check connection
|
# Check connection
|
||||||
check:
|
check:
|
||||||
conn.activity
|
conn.isSome()
|
||||||
conn.peerId == nodes[1].peerInfo.peerId
|
conn.get.activity
|
||||||
|
conn.get.peerId == nodes[1].peerInfo.peerId
|
||||||
|
|
||||||
# Check that node2 is being managed in node1
|
# Check that node2 is being managed in node1
|
||||||
check:
|
check:
|
||||||
@ -58,23 +70,25 @@ procSuite "Peer Manager":
|
|||||||
|
|
||||||
await allFutures(nodes.mapIt(it.stop()))
|
await allFutures(nodes.mapIt(it.stop()))
|
||||||
|
|
||||||
asyncTest "Dialing fails gracefully":
|
asyncTest "dialPeer() fails gracefully":
|
||||||
# Create 2 nodes
|
# Create 2 nodes and start them
|
||||||
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
await nodes[0].start()
|
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
await nodes[0].mountRelay()
|
|
||||||
|
|
||||||
# Purposefully don't start node2
|
# Dial non-existent peer from node1
|
||||||
|
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
|
||||||
# Dial node2 from node1
|
|
||||||
let connOpt = await nodes[1].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
|
||||||
|
|
||||||
# Check connection failed gracefully
|
|
||||||
check:
|
check:
|
||||||
connOpt.isNone()
|
conn1.isNone()
|
||||||
|
|
||||||
await nodes[0].stop()
|
# Dial peer not supporting given protocol
|
||||||
|
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
|
check:
|
||||||
|
conn2.isNone()
|
||||||
|
|
||||||
|
await allFutures(nodes.mapIt(it.stop()))
|
||||||
|
|
||||||
asyncTest "Adding, selecting and filtering peers work":
|
asyncTest "Adding, selecting and filtering peers work":
|
||||||
let
|
let
|
||||||
@ -120,9 +134,7 @@ procSuite "Peer Manager":
|
|||||||
# Create 2 nodes
|
# Create 2 nodes
|
||||||
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
|
||||||
await nodes[0].start()
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
# Do not start node2
|
|
||||||
|
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
# Test default connectedness for new peers
|
# Test default connectedness for new peers
|
||||||
@ -131,16 +143,17 @@ procSuite "Peer Manager":
|
|||||||
# No information about node2's connectedness
|
# No information about node2's connectedness
|
||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
|
||||||
|
|
||||||
# Purposefully don't start node2
|
# Failed connection
|
||||||
# Attempt dialing node2 from node1
|
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
require:
|
||||||
|
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
|
||||||
check:
|
check:
|
||||||
# Cannot connect to node2
|
# Cannot connect to node2
|
||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
|
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
|
||||||
|
|
||||||
# Successful connection
|
# Successful connection
|
||||||
await nodes[1].start()
|
require:
|
||||||
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true
|
||||||
check:
|
check:
|
||||||
# Currently connected to node2
|
# Currently connected to node2
|
||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
|
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
|
||||||
@ -157,28 +170,31 @@ procSuite "Peer Manager":
|
|||||||
# Create 2 nodes
|
# Create 2 nodes
|
||||||
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||||
|
|
||||||
await nodes[0].start()
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await nodes[0].mountRelay()
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
|
|
||||||
|
let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e")
|
||||||
|
|
||||||
|
nodes[0].peerManager.addPeer(nonExistentPeer)
|
||||||
|
|
||||||
# Set a low backoff to speed up test: 2, 4, 8, 16
|
# Set a low backoff to speed up test: 2, 4, 8, 16
|
||||||
nodes[0].peerManager.initialBackoffInSec = 2
|
nodes[0].peerManager.initialBackoffInSec = 2
|
||||||
nodes[0].peerManager.backoffFactor = 2
|
nodes[0].peerManager.backoffFactor = 2
|
||||||
|
|
||||||
# node2 is not started, so dialing will fail
|
# try to connect to peer that doesnt exist
|
||||||
let conn1 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
|
let conn1Ok = await nodes[0].peerManager.connectRelay(nonExistentPeer)
|
||||||
check:
|
check:
|
||||||
# Cannot connect to node2
|
# Cannot connect to node2
|
||||||
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
|
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect
|
||||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].peerInfo.peerId] == CannotConnect
|
nodes[0].peerManager.peerStore[ConnectionBook][nonExistentPeer.peerId] == CannotConnect
|
||||||
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 1
|
nodes[0].peerManager.peerStore[NumberFailedConnBook][nonExistentPeer.peerId] == 1
|
||||||
|
|
||||||
# And the connection failed
|
# Connection attempt failed
|
||||||
conn1.isNone() == true
|
conn1Ok == false
|
||||||
|
|
||||||
# Right after failing there is a backoff period
|
# Right after failing there is a backoff period
|
||||||
nodes[0].peerManager.peerStore.canBeConnected(
|
nodes[0].peerManager.peerStore.canBeConnected(
|
||||||
nodes[1].peerInfo.peerId,
|
nonExistentPeer.peerId,
|
||||||
nodes[0].peerManager.initialBackoffInSec,
|
nodes[0].peerManager.initialBackoffInSec,
|
||||||
nodes[0].peerManager.backoffFactor) == false
|
nodes[0].peerManager.backoffFactor) == false
|
||||||
|
|
||||||
@ -192,13 +208,11 @@ procSuite "Peer Manager":
|
|||||||
nodes[0].peerManager.initialBackoffInSec,
|
nodes[0].peerManager.initialBackoffInSec,
|
||||||
nodes[0].peerManager.backoffFactor) == true
|
nodes[0].peerManager.backoffFactor) == true
|
||||||
|
|
||||||
await nodes[1].start()
|
# After a successful connection, the number of failed connections is reset
|
||||||
await nodes[1].mountRelay()
|
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
|
||||||
|
let conn2Ok = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
|
||||||
# Now we can connect and failed count is reset
|
|
||||||
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
|
|
||||||
check:
|
check:
|
||||||
conn2.isNone() == false
|
conn2Ok == true
|
||||||
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0
|
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0
|
||||||
|
|
||||||
await allFutures(nodes.mapIt(it.stop()))
|
await allFutures(nodes.mapIt(it.stop()))
|
||||||
@ -217,7 +231,8 @@ procSuite "Peer Manager":
|
|||||||
await node1.mountRelay()
|
await node1.mountRelay()
|
||||||
await node2.mountRelay()
|
await node2.mountRelay()
|
||||||
|
|
||||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
require:
|
||||||
|
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
|
||||||
check:
|
check:
|
||||||
# Currently connected to node2
|
# Currently connected to node2
|
||||||
node1.peerManager.peerStore.peers().len == 1
|
node1.peerManager.peerStore.peers().len == 1
|
||||||
@ -265,7 +280,8 @@ procSuite "Peer Manager":
|
|||||||
await node2.mountRelay()
|
await node2.mountRelay()
|
||||||
node2.wakuRelay.codec = betaCodec
|
node2.wakuRelay.codec = betaCodec
|
||||||
|
|
||||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
|
require:
|
||||||
|
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
|
||||||
check:
|
check:
|
||||||
# Currently connected to node2
|
# Currently connected to node2
|
||||||
node1.peerManager.peerStore.peers().len == 1
|
node1.peerManager.peerStore.peers().len == 1
|
||||||
@ -353,9 +369,10 @@ procSuite "Peer Manager":
|
|||||||
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
# all nodes connect to peer 0
|
# all nodes connect to peer 0
|
||||||
discard await nodes[1].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
|
require:
|
||||||
discard await nodes[2].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
|
(await nodes[1].peerManager.connectRelay(peerInfos[0])) == true
|
||||||
discard await nodes[3].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds)
|
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
|
||||||
|
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# Peerstore track all three peers
|
# Peerstore track all three peers
|
||||||
|
|||||||
@ -138,9 +138,9 @@ suite "WakuNode":
|
|||||||
await node3.start()
|
await node3.start()
|
||||||
await node3.mountRelay()
|
await node3.mountRelay()
|
||||||
|
|
||||||
discard await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
discard await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())
|
||||||
await sleepAsync(3.seconds)
|
await sleepAsync(3.seconds)
|
||||||
discard await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
discard await node1.peerManager.connectRelay(node3.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# Verify that only the first connection succeeded
|
# Verify that only the first connection succeeded
|
||||||
|
|||||||
@ -116,9 +116,9 @@ suite "Waku Relay":
|
|||||||
await allFutures(srcSwitch.start(), dstSwitch.start())
|
await allFutures(srcSwitch.start(), dstSwitch.start())
|
||||||
|
|
||||||
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
||||||
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
|
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
|
||||||
require:
|
require:
|
||||||
conn.isSome()
|
connOk == true
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let networkTopic = "test-network1"
|
let networkTopic = "test-network1"
|
||||||
@ -174,9 +174,9 @@ suite "Waku Relay":
|
|||||||
await allFutures(srcSwitch.start(), dstSwitch.start())
|
await allFutures(srcSwitch.start(), dstSwitch.start())
|
||||||
|
|
||||||
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
||||||
let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec)
|
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
|
||||||
require:
|
require:
|
||||||
conn.isSome()
|
connOk == true
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let networkTopic = "test-network1"
|
let networkTopic = "test-network1"
|
||||||
|
|||||||
@ -221,8 +221,9 @@ suite "WakuNode - Relay":
|
|||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
|
|
||||||
# Connect nodes
|
# Connect nodes
|
||||||
let conn = await nodes[0].peerManager.dialPeer(nodes[1].switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec)
|
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
|
||||||
require conn.isSome
|
require:
|
||||||
|
connOk == true
|
||||||
|
|
||||||
# Node 1 subscribes to topic
|
# Node 1 subscribes to topic
|
||||||
nodes[1].subscribe(DefaultPubsubTopic)
|
nodes[1].subscribe(DefaultPubsubTopic)
|
||||||
|
|||||||
@ -45,8 +45,8 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||||||
debug "post_waku_v2_admin_v1_peers"
|
debug "post_waku_v2_admin_v1_peers"
|
||||||
|
|
||||||
for i, peer in peers:
|
for i, peer in peers:
|
||||||
let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc")
|
let connOk = await node.peerManager.connectRelay(parseRemotePeerInfo(peer), source="rpc")
|
||||||
if conn.isNone():
|
if not connOk:
|
||||||
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
|
raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|||||||
@ -87,29 +87,108 @@ proc insertOrReplace(ps: PeerStorage,
|
|||||||
warn "failed to store peers", err = res.error
|
warn "failed to store peers", err = res.error
|
||||||
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||||
|
|
||||||
proc dialPeer(pm: PeerManager, peerId: PeerID,
|
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
|
||||||
addrs: seq[MultiAddress], proto: string,
|
# Adds peer to manager for the specified protocol
|
||||||
dialTimeout = DefaultDialTimeout,
|
|
||||||
source = "api",
|
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||||
): Future[Option[Connection]] {.async.} =
|
# Do not attempt to manage our unmanageable self
|
||||||
|
return
|
||||||
|
|
||||||
|
# ...public key
|
||||||
|
var publicKey: PublicKey
|
||||||
|
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
||||||
|
|
||||||
|
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
|
||||||
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
|
||||||
|
# Peer already managed
|
||||||
|
return
|
||||||
|
|
||||||
|
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
||||||
|
|
||||||
|
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||||
|
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||||
|
|
||||||
|
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||||
|
if not pm.storage.isNil:
|
||||||
|
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
||||||
|
|
||||||
|
# Connects to a given node. Note that this function uses `connect` and
|
||||||
|
# does not provide a protocol. Streams for relay (gossipsub) are created
|
||||||
|
# automatically without the needing to dial.
|
||||||
|
proc connectRelay*(pm: PeerManager,
|
||||||
|
peer: RemotePeerInfo,
|
||||||
|
dialTimeout = DefaultDialTimeout,
|
||||||
|
source = "api"): Future[bool] {.async.} =
|
||||||
|
|
||||||
|
let peerId = peer.peerId
|
||||||
|
|
||||||
# Do not attempt to dial self
|
# Do not attempt to dial self
|
||||||
if peerId == pm.switch.peerInfo.peerId:
|
if peerId == pm.switch.peerInfo.peerId:
|
||||||
return none(Connection)
|
return false
|
||||||
|
|
||||||
|
if not pm.peerStore.hasPeer(peerId, WakuRelayCodec):
|
||||||
|
pm.addPeer(peer)
|
||||||
|
|
||||||
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
||||||
debug "Dialing peer", wireAddr = addrs, peerId = peerId, failedAttempts=failedAttempts
|
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
|
||||||
|
|
||||||
|
var deadline = sleepAsync(dialTimeout)
|
||||||
|
var workfut = pm.switch.connect(peerId, peer.addrs)
|
||||||
|
var reasonFailed = ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
await workfut or deadline
|
||||||
|
if workfut.finished():
|
||||||
|
if not deadline.finished():
|
||||||
|
deadline.cancel()
|
||||||
|
waku_peers_dials.inc(labelValues = ["successful"])
|
||||||
|
waku_node_conns_initiated.inc(labelValues = [source])
|
||||||
|
pm.peerStore[NumberFailedConnBook][peerId] = 0
|
||||||
|
return true
|
||||||
|
else:
|
||||||
|
reasonFailed = "timed out"
|
||||||
|
await cancelAndWait(workfut)
|
||||||
|
except CatchableError as exc:
|
||||||
|
reasonFailed = "remote peer failed"
|
||||||
|
|
||||||
|
# Dial failed
|
||||||
|
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
|
||||||
|
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
|
||||||
|
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
||||||
|
|
||||||
|
debug "Connecting relay peer failed",
|
||||||
|
peerId = peerId,
|
||||||
|
reason = reasonFailed,
|
||||||
|
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
||||||
|
waku_peers_dials.inc(labelValues = [reasonFailed])
|
||||||
|
|
||||||
|
return false
|
||||||
|
|
||||||
|
# Dialing should be used for just protocols that require a stream to write and read
|
||||||
|
# This shall not be used to dial Relay protocols, since that would create
|
||||||
|
# unneccesary unused streams.
|
||||||
|
proc dialPeer(pm: PeerManager,
|
||||||
|
peerId: PeerID,
|
||||||
|
addrs: seq[MultiAddress],
|
||||||
|
proto: string,
|
||||||
|
dialTimeout = DefaultDialTimeout,
|
||||||
|
source = "api"): Future[Option[Connection]] {.async.} =
|
||||||
|
|
||||||
|
if peerId == pm.switch.peerInfo.peerId:
|
||||||
|
error "could not dial self"
|
||||||
|
return none(Connection)
|
||||||
|
|
||||||
|
if proto == WakuRelayCodec:
|
||||||
|
error "dial shall not be used to connect to relays"
|
||||||
|
return none(Connection)
|
||||||
|
|
||||||
|
debug "Dialing peer", wireAddr=addrs, peerId=peerId, proto=proto
|
||||||
|
|
||||||
# Dial Peer
|
# Dial Peer
|
||||||
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
||||||
|
|
||||||
var reasonFailed = ""
|
var reasonFailed = ""
|
||||||
try:
|
try:
|
||||||
if (await dialFut.withTimeout(dialTimeout)):
|
if (await dialFut.withTimeout(dialTimeout)):
|
||||||
waku_peers_dials.inc(labelValues = ["successful"])
|
|
||||||
# TODO: This will be populated from the peerstore info when ready
|
|
||||||
waku_node_conns_initiated.inc(labelValues = [source])
|
|
||||||
pm.peerStore[NumberFailedConnBook][peerId] = 0
|
|
||||||
return some(dialFut.read())
|
return some(dialFut.read())
|
||||||
else:
|
else:
|
||||||
reasonFailed = "timeout"
|
reasonFailed = "timeout"
|
||||||
@ -117,20 +196,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
reasonFailed = "failed"
|
reasonFailed = "failed"
|
||||||
|
|
||||||
# Dial failed
|
debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
|
||||||
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
|
|
||||||
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
|
|
||||||
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
|
||||||
|
|
||||||
debug "Dialing peer failed",
|
|
||||||
peerId = peerId,
|
|
||||||
reason = reasonFailed,
|
|
||||||
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
|
||||||
waku_peers_dials.inc(labelValues = [reasonFailed])
|
|
||||||
|
|
||||||
# Update storage
|
|
||||||
if not pm.storage.isNil:
|
|
||||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
|
||||||
|
|
||||||
return none(Connection)
|
return none(Connection)
|
||||||
|
|
||||||
@ -255,31 +321,6 @@ proc new*(T: type PeerManager,
|
|||||||
# Manager interface #
|
# Manager interface #
|
||||||
#####################
|
#####################
|
||||||
|
|
||||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
|
|
||||||
# Adds peer to manager for the specified protocol
|
|
||||||
|
|
||||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
|
||||||
# Do not attempt to manage our unmanageable self
|
|
||||||
return
|
|
||||||
|
|
||||||
# ...public key
|
|
||||||
var publicKey: PublicKey
|
|
||||||
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
|
||||||
|
|
||||||
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
|
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
|
|
||||||
# Peer already managed
|
|
||||||
return
|
|
||||||
|
|
||||||
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
|
||||||
|
|
||||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
|
||||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
|
||||||
|
|
||||||
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
|
||||||
if not pm.storage.isNil:
|
|
||||||
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
|
||||||
|
|
||||||
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
||||||
# Do not add relay peers
|
# Do not add relay peers
|
||||||
if proto == WakuRelayCodec:
|
if proto == WakuRelayCodec:
|
||||||
@ -303,16 +344,16 @@ proc reconnectPeers*(pm: PeerManager,
|
|||||||
debug "Reconnecting peers", proto=proto
|
debug "Reconnecting peers", proto=proto
|
||||||
|
|
||||||
# Proto is not persisted, we need to iterate over all peers.
|
# Proto is not persisted, we need to iterate over all peers.
|
||||||
for storedInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
|
||||||
# Check that the peer can be connected
|
# Check that the peer can be connected
|
||||||
if storedInfo.connectedness == CannotConnect:
|
if peerInfo.connectedness == CannotConnect:
|
||||||
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
|
debug "Not reconnecting to unreachable or non-existing peer", peerId=peerInfo.peerId
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Respect optional backoff period where applicable.
|
# Respect optional backoff period where applicable.
|
||||||
let
|
let
|
||||||
# TODO: Add method to peerStore (eg isBackoffExpired())
|
# TODO: Add method to peerStore (eg isBackoffExpired())
|
||||||
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
|
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
|
||||||
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
||||||
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
||||||
|
|
||||||
@ -320,12 +361,11 @@ proc reconnectPeers*(pm: PeerManager,
|
|||||||
|
|
||||||
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
||||||
if backoffTime > ZeroDuration:
|
if backoffTime > ZeroDuration:
|
||||||
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
|
debug "Backing off before reconnect...", peerId=peerInfo.peerId, backoffTime=backoffTime
|
||||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||||
await sleepAsync(backoffTime)
|
await sleepAsync(backoffTime)
|
||||||
|
|
||||||
trace "Reconnecting to peer", peerId= $storedInfo.peerId
|
discard await pm.connectRelay(peerInfo)
|
||||||
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Dialer interface #
|
# Dialer interface #
|
||||||
@ -362,7 +402,6 @@ proc dialPeer*(pm: PeerManager,
|
|||||||
|
|
||||||
proc connectToNodes*(pm: PeerManager,
|
proc connectToNodes*(pm: PeerManager,
|
||||||
nodes: seq[string]|seq[RemotePeerInfo],
|
nodes: seq[string]|seq[RemotePeerInfo],
|
||||||
proto: string,
|
|
||||||
dialTimeout = DefaultDialTimeout,
|
dialTimeout = DefaultDialTimeout,
|
||||||
source = "api") {.async.} =
|
source = "api") {.async.} =
|
||||||
if nodes.len == 0:
|
if nodes.len == 0:
|
||||||
@ -370,14 +409,14 @@ proc connectToNodes*(pm: PeerManager,
|
|||||||
|
|
||||||
info "Dialing multiple peers", numOfPeers = nodes.len
|
info "Dialing multiple peers", numOfPeers = nodes.len
|
||||||
|
|
||||||
var futConns: seq[Future[Option[Connection]]]
|
var futConns: seq[Future[bool]]
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
let node = when node is string: parseRemotePeerInfo(node)
|
let node = when node is string: parseRemotePeerInfo(node)
|
||||||
else: node
|
else: node
|
||||||
futConns.add(pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source))
|
futConns.add(pm.connectRelay(node))
|
||||||
|
|
||||||
await allFutures(futConns)
|
await allFutures(futConns)
|
||||||
let successfulConns = futConns.mapIt(it.read()).countIt(it.isSome)
|
let successfulConns = futConns.mapIt(it.read()).countIt(true)
|
||||||
|
|
||||||
info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len
|
info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len
|
||||||
|
|
||||||
@ -414,7 +453,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
|||||||
notConnectedPeers = notConnectedPeers.len,
|
notConnectedPeers = notConnectedPeers.len,
|
||||||
outsideBackoffPeers = outsideBackoffPeers.len
|
outsideBackoffPeers = outsideBackoffPeers.len
|
||||||
|
|
||||||
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
|
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
|
||||||
|
|
||||||
proc prunePeerStore*(pm: PeerManager) =
|
proc prunePeerStore*(pm: PeerManager) =
|
||||||
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
||||||
|
|||||||
@ -380,8 +380,8 @@ proc info*(node: WakuNode): WakuInfo =
|
|||||||
|
|
||||||
proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
|
proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
|
||||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
# NOTE This is dialing on WakuRelay protocol specifically
|
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
|
||||||
await peer_manager.connectToNodes(node.peerManager, nodes, WakuRelayCodec, source=source)
|
await connectToNodes(node.peerManager, nodes, source=source)
|
||||||
|
|
||||||
|
|
||||||
## Waku relay
|
## Waku relay
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user