From 5042b062153a685e8f9838b18357fb3a5464708e Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Wed, 14 Dec 2022 16:04:11 +0100 Subject: [PATCH] chore(p2p): unify dialpeer functions (#1458) --- waku/v2/node/peer_manager/peer_manager.nim | 88 +++++++++------------- waku/v2/node/waku_node.nim | 2 +- 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 29d40239c..e1c2f377d 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -18,6 +18,7 @@ import export waku_peer_store, peer_storage, peers declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] +# TODO: Populate from PeerStore.Source when ready declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"] @@ -31,8 +32,9 @@ type peerStore*: PeerStore storage: PeerStorage -let - defaultDialTimeout = chronos.minutes(1) # TODO: should this be made configurable? +const + # TODO: Make configurable + DefaultDialTimeout = chronos.seconds(10) #################### # Helper functions # @@ -51,7 +53,14 @@ proc insertOrReplace(ps: PeerStorage, proc dialPeer(pm: PeerManager, peerId: PeerID, addrs: seq[MultiAddress], proto: string, - dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = + dialTimeout = DefaultDialTimeout, + source = "api" + ): Future[Option[Connection]] {.async.} = + + # Do not attempt to dial self + if peerId == pm.switch.peerInfo.peerId: + return none(Connection) + info "Dialing peer from manager", wireAddr = addrs, peerId = peerId # Dial Peer @@ -59,8 +68,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, try: # Attempt to dial remote peer - if (await dialFut.withTimeout(dialTimeout)): + if (await dialFut.withTimeout(DefaultDialTimeout)): waku_peers_dials.inc(labelValues = ["successful"]) + # TODO: This will be populated from the peerstore info when ready + waku_node_conns_initiated.inc(labelValues = [source]) return some(dialFut.read()) else: # TODO: any redial attempts? @@ -224,7 +235,11 @@ proc reconnectPeers*(pm: PeerManager, # Dialer interface # #################### -proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = +proc dialPeer*(pm: PeerManager, + remotePeerInfo: RemotePeerInfo, + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api"): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers # TODO: check peer validity and score before continuing. Limit number of peers to be managed. @@ -233,44 +248,31 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto pm.addPeer(remotePeerInfo, proto) - if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: - # Do not attempt to dial self - return none(Connection) + return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source) - return await pm.dialPeer(remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout) - -proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = +proc dialPeer*(pm: PeerManager, + peerId: PeerID, + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api" + ): Future[Option[Connection]] {.async.} = # Dial an existing peer by looking up it's existing addrs in the switch's peerStore # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - if peerId == pm.switch.peerInfo.peerId: - # Do not attempt to dial self - return none(Connection) - let addrs = pm.switch.peerStore[AddressBook][peerId] + return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source) - return await pm.dialPeer(peerId, addrs, proto, dialTimeout) - -proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - info "Connecting to node", remotePeer = remotePeer, source = source - - info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId - let connOpt = await pm.dialPeer(remotePeer, proto) - - if connOpt.isSome(): - info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId - waku_node_conns_initiated.inc(labelValues = [source]) - else: - error "Failed to connect to peer", wireAddr= $remotePeer.addrs[0], peerId= $remotePeer.peerId - waku_peers_errors.inc(labelValues = ["conn_init_failure"]) - -proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) +proc connectToNodes*(pm: PeerManager, + nodes: seq[string]|seq[RemotePeerInfo], + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api") {.async.} = info "connectToNodes", len = nodes.len - for nodeId in nodes: - await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source) + for node in nodes: + let node = when node is string: parseRemotePeerInfo(node) + else: node + discard await pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source) # The issue seems to be around peers not being fully connected when # trying to subscribe. So what we do is sleep to guarantee nodes are @@ -279,19 +281,3 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source # This issue was known to Dmitiry on nim-libp2p and may be resolvable # later. await sleepAsync(chronos.seconds(5)) - -proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - info "connectToNodes", len = nodes.len - - for remotePeerInfo in nodes: - await connectToNode(pm, remotePeerInfo, proto, source) - - # The issue seems to be around peers not being fully connected when - # trying to subscribe. So what we do is sleep to guarantee nodes are - # fully connected. - # - # This issue was known to Dmitiry on nim-libp2p and may be resolvable - # later. - await sleepAsync(chronos.seconds(5)) - diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index a82e74771..cc3f56ef0 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -253,7 +253,7 @@ proc info*(node: WakuNode): WakuInfo = proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = ## `source` indicates source of node addrs (static config, api call, discovery, etc) # NOTE This is dialing on WakuRelay protocol specifically - await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source) + await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source=source) ## Waku relay