Feat/peer manager improvements (#367)

* Add managed peers to Admin API result

* Deal with dial failures
This commit is contained in:
Hanno Cornelius 2021-02-05 12:49:11 +02:00 committed by GitHub
parent 6c7515115d
commit 1f5c3cc621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 134 additions and 9 deletions

View File

@ -18,6 +18,7 @@ import
admin_api, admin_api,
private_api], private_api],
../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
@ -366,7 +367,59 @@ procSuite "Waku v2 JSON-RPC API":
server.close() server.close()
waitfor node.stop() waitfor node.stop()
asyncTest "Admin API: get peer information": asyncTest "Admin API: get managed peer information":
# Create a couple of nodes
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
peerInfo2 = node2.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004))
peerInfo3 = node3.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
node1.mountRelay()
node2.mountRelay()
node3.mountRelay()
# Dial nodes 2 and 3 from node1
await node1.dialPeer(constructMultiaddrStr(peerInfo2))
await node1.dialPeer(constructMultiaddrStr(peerInfo3))
# RPC server setup
let
rpcPort = Port(8545)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
installAdminApiHandlers(node1, server)
server.start()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort)
let response = await client.get_waku_v2_admin_v1_peers()
check:
response.len == 2
# Check peer 2
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo2))
# Check peer 3
response.anyIt(it.protocol == WakuRelayCodec and
it.multiaddr == constructMultiaddrStr(peerInfo3))
server.stop()
server.close()
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Admin API: get unmanaged peer information":
const cTopic = ContentTopic(1) const cTopic = ContentTopic(1)
waitFor node.start() waitFor node.start()

View File

@ -15,7 +15,7 @@ import
../test_helpers ../test_helpers
procSuite "Peer Manager": procSuite "Peer Manager":
asyncTest "Peer dialing": asyncTest "Peer dialing works":
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
@ -31,7 +31,7 @@ procSuite "Peer Manager":
node2.mountRelay() node2.mountRelay()
# Dial node2 from node1 # Dial node2 from node1
let conn = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec) let conn = (await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec)).get()
# Check connection # Check connection
check: check:
@ -45,4 +45,30 @@ procSuite "Peer Manager":
# Check connectedness # Check connectedness
check: check:
node1.peerManager.connectedness(peerInfo2.peerId) node1.peerManager.connectedness(peerInfo2.peerId)
await allFutures([node1.stop(), node2.stop()])
asyncTest "Dialing fails gracefully":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
peerInfo2 = node2.peerInfo
await node1.start()
# Purposefully don't start node2
node1.mountRelay()
node2.mountRelay()
# Dial node2 from node1
let connOpt = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
# Check connection failed gracefully
check:
connOpt.isNone()
await node1.stop()

View File

@ -1,20 +1,25 @@
{.push raises: [Exception, Defect].} {.push raises: [Exception, Defect].}
import import
std/[options,sequtils], std/[options, sequtils, sets],
json_rpc/rpcserver, json_rpc/rpcserver,
libp2p/[peerinfo, switch], libp2p/[peerinfo, switch],
../../protocol/waku_store/[waku_store_types, waku_store], ../../protocol/waku_store/[waku_store_types, waku_store],
../../protocol/waku_swap/[waku_swap_types, waku_swap], ../../protocol/waku_swap/[waku_swap_types, waku_swap],
../../protocol/waku_filter/[waku_filter_types, waku_filter], ../../protocol/waku_filter/[waku_filter_types, waku_filter],
../wakunode2, ../wakunode2,
../peer_manager,
./jsonrpc_types ./jsonrpc_types
export jsonrpc_types export jsonrpc_types
proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string =
# Constructs a multiaddress with both wire address and p2p identity
$wireaddr & "/p2p/" & $peerId
proc constructMultiaddrStr*(peerInfo: PeerInfo): string = proc constructMultiaddrStr*(peerInfo: PeerInfo): string =
# Constructs a multiaddress with both location address and p2p identity # Constructs a multiaddress with both location (wire) address and p2p identity
$peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId constructMultiaddrStr(peerInfo.addrs[0], peerInfo.peerId)
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
@ -29,6 +34,18 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
var wPeers: seq[WakuPeer] = @[] var wPeers: seq[WakuPeer] = @[]
## Managed peers
if not node.wakuRelay.isNil:
# Map all managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: toSeq(it.protos.items)[0],
connected: node.peerManager.connectedness(it.peerId))),
wPeers.len) # Append to the end of the sequence
## Unmanaged peers
## @TODO add these peers to peer manager
if not node.wakuSwap.isNil: if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list # Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo),

View File

@ -1,15 +1,26 @@
{.push raises: [Defect, Exception].} {.push raises: [Defect, Exception].}
import import
chronos, chronicles, std/options,
chronos, chronicles, metrics,
libp2p/standard_setup, libp2p/standard_setup,
libp2p/peerstore libp2p/peerstore
export peerstore, standard_setup
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
logScope:
topics = "wakupeers"
type type
PeerManager* = ref object of RootObj PeerManager* = ref object of RootObj
switch*: Switch switch*: Switch
peerStore*: PeerStore peerStore*: PeerStore
const
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
proc new*(T: type PeerManager, switch: Switch): PeerManager = proc new*(T: type PeerManager, switch: Switch): PeerManager =
T(switch: switch, T(switch: switch,
peerStore: PeerStore.new()) peerStore: PeerStore.new())
@ -18,7 +29,7 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager =
# Dialer interface # # Dialer interface #
#################### ####################
proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Connection] {.async.} = proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers # Dial a given peer and add it to the list of known peers
# @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed. # @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed.
@ -43,7 +54,25 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Conne
# Dial Peer # Dial Peer
# @TODO Keep track of conn and connected state in peer store # @TODO Keep track of conn and connected state in peer store
return await pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto) let dialFut = pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto)
try:
# Attempt to dial remote peer
if (await dialFut.withTimeout(dialTimeout)):
waku_peers_dials.inc(labelValues = ["successful"])
return some(dialFut.read())
else:
# @TODO any redial attempts?
# @TODO indicate CannotConnect on peer metadata
debug "Dialing remote peer timed out"
waku_peers_dials.inc(labelValues = ["timeout"])
return none(Connection)
except CatchableError as e:
# @TODO any redial attempts?
# @TODO indicate CannotConnect on peer metadata
debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"])
return none(Connection)
##################### #####################
# Manager interface # # Manager interface #