mirror of https://github.com/waku-org/nwaku.git
Feat/pm connection tracking (#377)
* Track connectedness state in peer manager
This commit is contained in:
parent
bf0eab4a48
commit
3b6db72287
|
@ -47,7 +47,7 @@ procSuite "Peer Manager":
|
|||
|
||||
# Check connectedness
|
||||
check:
|
||||
node1.peerManager.connectedness(peerInfo2.peerId)
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
|
@ -118,3 +118,45 @@ procSuite "Peer Manager":
|
|||
it.protos.contains(WakuStoreCodec))
|
||||
|
||||
await node.stop()
|
||||
|
||||
|
||||
asyncTest "Peer manager keeps track of connections":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
await node1.start()
|
||||
|
||||
node1.mountRelay()
|
||||
node2.mountRelay()
|
||||
|
||||
# Test default connectedness for new peers
|
||||
node1.peerManager.addPeer(peerInfo2, WakuRelayCodec)
|
||||
check:
|
||||
# No information about node2's connectedness
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
# Purposefully don't start node2
|
||||
# Attempt dialing node2 from node1
|
||||
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Cannot connect to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
|
||||
# Successful connection
|
||||
await node2.start()
|
||||
discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Stop node. Gracefully disconnect from all peers.
|
||||
await node1.stop()
|
||||
check:
|
||||
# Not currently connected to node2, but had recent, successful connection.
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
||||
|
|
|
@ -42,7 +42,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuRelayCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuFilter.isNil:
|
||||
|
@ -50,7 +50,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuFilterCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuSwap.isNil:
|
||||
|
@ -58,7 +58,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuSwapCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
|
@ -66,7 +66,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
|||
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuStoreCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId))),
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
# @TODO filter output on protocol/connected-status
|
||||
|
|
|
@ -14,16 +14,54 @@ logScope:
|
|||
topics = "wakupeers"
|
||||
|
||||
type
|
||||
Connectedness* = enum
|
||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||
NotConnected,
|
||||
# CannotConnect: attempted to connect to peer, but failed.
|
||||
CannotConnect,
|
||||
# CanConnect: was recently connected to peer and disconnected gracefully.
|
||||
CanConnect,
|
||||
# Connected: actively connected to peer.
|
||||
Connected
|
||||
|
||||
ConnectionBook* = object of PeerBook[Connectedness]
|
||||
|
||||
WakuPeerStore* = ref object of PeerStore
|
||||
connectionBook*: ConnectionBook
|
||||
|
||||
PeerManager* = ref object of RootObj
|
||||
switch*: Switch
|
||||
peerStore*: PeerStore
|
||||
peerStore*: WakuPeerStore
|
||||
|
||||
const
|
||||
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
|
||||
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
pm.peerStore.connectionBook.set(peerId, Connected)
|
||||
return
|
||||
of ConnEventKind.Disconnected:
|
||||
pm.peerStore.connectionBook.set(peerId, CanConnect)
|
||||
return
|
||||
|
||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||
var p: WakuPeerStore
|
||||
new(p)
|
||||
return p
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch): PeerManager =
|
||||
T(switch: switch,
|
||||
peerStore: PeerStore.new())
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new())
|
||||
|
||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||
onConnEvent(pm, peerId, event)
|
||||
|
||||
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
||||
|
||||
return pm
|
||||
|
||||
####################
|
||||
# Helper functions #
|
||||
|
@ -46,7 +84,7 @@ proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
|
|||
# Return the known info for all peers registered on the specified protocol
|
||||
pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
|
@ -55,9 +93,9 @@ proc connectedness*(pm: PeerManager, peerId: PeerId): bool =
|
|||
|
||||
if (storedInfo == StoredInfo()):
|
||||
# Peer is not managed, therefore not connected
|
||||
return false
|
||||
return NotConnected
|
||||
else:
|
||||
pm.switch.isConnected(peerId)
|
||||
pm.peerStore.connectionBook.get(peerId)
|
||||
|
||||
proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
|
@ -123,10 +161,12 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
|
|||
# @TODO indicate CannotConnect on peer metadata
|
||||
debug "Dialing remote peer timed out"
|
||||
waku_peers_dials.inc(labelValues = ["timeout"])
|
||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
||||
return none(Connection)
|
||||
except CatchableError as e:
|
||||
# @TODO any redial attempts?
|
||||
# @TODO indicate CannotConnect on peer metadata
|
||||
debug "Dialing remote peer failed", msg = e.msg
|
||||
waku_peers_dials.inc(labelValues = ["failed"])
|
||||
pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect)
|
||||
return none(Connection)
|
||||
|
|
|
@ -394,7 +394,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
|||
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
|
@ -433,7 +433,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
|
|||
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
|
|
|
@ -93,7 +93,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} =
|
|||
let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec)
|
||||
|
||||
if peerOpt.isNone():
|
||||
error "failed to connect to remote peer"
|
||||
error "no suitable remote peers"
|
||||
waku_swap_errors.inc(labelValues = [dialFailure])
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in New Issue