chore(p2p): unify dialpeer functions (#1458)

This commit is contained in:
Alvaro Revuelta 2022-12-14 16:04:11 +01:00 committed by GitHub
parent 226b44c86d
commit 5042b06215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 52 deletions

View File

@ -18,6 +18,7 @@ import
export waku_peer_store, peer_storage, peers
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"]
@ -31,8 +32,9 @@ type
peerStore*: PeerStore
storage: PeerStorage
let
defaultDialTimeout = chronos.minutes(1) # TODO: should this be made configurable?
const
# TODO: Make configurable
DefaultDialTimeout = chronos.seconds(10)
####################
# Helper functions #
@ -51,7 +53,14 @@ proc insertOrReplace(ps: PeerStorage,
proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string,
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
dialTimeout = DefaultDialTimeout,
source = "api"
): Future[Option[Connection]] {.async.} =
# Do not attempt to dial self
if peerId == pm.switch.peerInfo.peerId:
return none(Connection)
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
# Dial Peer
@ -59,8 +68,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
try:
# Attempt to dial remote peer
if (await dialFut.withTimeout(dialTimeout)):
if (await dialFut.withTimeout(DefaultDialTimeout)):
waku_peers_dials.inc(labelValues = ["successful"])
# TODO: This will be populated from the peerstore info when ready
waku_node_conns_initiated.inc(labelValues = [source])
return some(dialFut.read())
else:
# TODO: any redial attempts?
@ -224,7 +235,11 @@ proc reconnectPeers*(pm: PeerManager,
# Dialer interface #
####################
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
proc dialPeer*(pm: PeerManager,
remotePeerInfo: RemotePeerInfo,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
@ -233,44 +248,31 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
pm.addPeer(remotePeerInfo, proto)
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source)
return await pm.dialPeer(remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout)
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
proc dialPeer*(pm: PeerManager,
peerId: PeerID,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"
): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
if peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
let addrs = pm.switch.peerStore[AddressBook][peerId]
return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source)
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "Connecting to node", remotePeer = remotePeer, source = source
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
let connOpt = await pm.dialPeer(remotePeer, proto)
if connOpt.isSome():
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_conns_initiated.inc(labelValues = [source])
else:
error "Failed to connect to peer", wireAddr= $remotePeer.addrs[0], peerId= $remotePeer.peerId
waku_peers_errors.inc(labelValues = ["conn_init_failure"])
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
proc connectToNodes*(pm: PeerManager,
nodes: seq[string]|seq[RemotePeerInfo],
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api") {.async.} =
info "connectToNodes", len = nodes.len
for nodeId in nodes:
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
for node in nodes:
let node = when node is string: parseRemotePeerInfo(node)
else: node
discard await pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source)
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
@ -279,19 +281,3 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync(chronos.seconds(5))
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len
for remotePeerInfo in nodes:
await connectToNode(pm, remotePeerInfo, proto, source)
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync(chronos.seconds(5))

View File

@ -253,7 +253,7 @@ proc info*(node: WakuNode): WakuInfo =
proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE This is dialing on WakuRelay protocol specifically
await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source)
await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source=source)
## Waku relay