mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-18 10:52:26 +00:00
341 lines
14 KiB
Nim
341 lines
14 KiB
Nim
{.push raises: [Defect].}
|
|
|
|
import
|
|
std/[options, sets, sequtils, times],
|
|
chronos, chronicles, metrics,
|
|
libp2p/multistream,
|
|
./waku_peer_store,
|
|
../storage/peer/peer_storage,
|
|
../../utils/peers
|
|
|
|
export waku_peer_store, peer_storage, peers
|
|
|
|
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
|
|
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
|
|
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
|
|
|
logScope:
|
|
topics = "wakupeers"
|
|
|
|
type
|
|
PeerManager* = ref object of RootObj
|
|
switch*: Switch
|
|
peerStore*: WakuPeerStore
|
|
storage: PeerStorage
|
|
|
|
let
|
|
defaultDialTimeout = chronos.minutes(1) # @TODO should this be made configurable?
|
|
|
|
####################
|
|
# Helper functions #
|
|
####################
|
|
|
|
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
|
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
|
addrs = toSeq(storedInfo.addrs),
|
|
protocols = toSeq(storedInfo.protos))
|
|
|
|
proc insertOrReplace(ps: PeerStorage,
|
|
peerId: PeerID,
|
|
storedInfo: StoredInfo,
|
|
connectedness: Connectedness,
|
|
disconnectTime: int64 = 0) =
|
|
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
|
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
|
|
if res.isErr:
|
|
warn "failed to store peers", err = res.error
|
|
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
|
|
|
proc dialPeer(pm: PeerManager, peerId: PeerID,
|
|
addrs: seq[MultiAddress], proto: string,
|
|
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
|
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
|
|
|
|
# Dial Peer
|
|
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
|
|
|
try:
|
|
# Attempt to dial remote peer
|
|
if (await dialFut.withTimeout(dialTimeout)):
|
|
waku_peers_dials.inc(labelValues = ["successful"])
|
|
return some(dialFut.read())
|
|
else:
|
|
# @TODO any redial attempts?
|
|
debug "Dialing remote peer timed out"
|
|
waku_peers_dials.inc(labelValues = ["timeout"])
|
|
|
|
pm.peerStore.connectionBook[peerId] = CannotConnect
|
|
if not pm.storage.isNil:
|
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
|
|
|
return none(Connection)
|
|
except CatchableError as e:
|
|
# @TODO any redial attempts?
|
|
debug "Dialing remote peer failed", msg = e.msg
|
|
waku_peers_dials.inc(labelValues = ["failed"])
|
|
|
|
pm.peerStore.connectionBook[peerId] = CannotConnect
|
|
if not pm.storage.isNil:
|
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
|
|
|
return none(Connection)
|
|
|
|
proc loadFromStorage(pm: PeerManager) =
|
|
debug "loading peers from storage"
|
|
# Load peers from storage, if available
|
|
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
|
trace "loading peer", peerId=peerId, storedInfo=storedInfo, connectedness=connectedness
|
|
|
|
if peerId == pm.switch.peerInfo.peerId:
|
|
# Do not manage self
|
|
return
|
|
|
|
pm.peerStore.addressBook[peerId] = storedInfo.addrs
|
|
pm.peerStore.protoBook[peerId] = storedInfo.protos
|
|
pm.peerStore.keyBook[peerId] = storedInfo.publicKey
|
|
pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state
|
|
pm.peerStore.disconnectBook[peerId] = disconnectTime
|
|
|
|
let res = pm.storage.getAll(onData)
|
|
if res.isErr:
|
|
warn "failed to load peers from storage", err = res.error
|
|
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
|
|
else:
|
|
debug "successfully queried peer storage"
|
|
|
|
##################
|
|
# Initialisation #
|
|
##################
|
|
|
|
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
|
if not pm.peerStore.addressBook.contains(peerId):
|
|
## We only consider connection events if we
|
|
## already track some addresses for this peer
|
|
return
|
|
|
|
case event.kind
|
|
of ConnEventKind.Connected:
|
|
pm.peerStore.connectionBook[peerId] = Connected
|
|
if not pm.storage.isNil:
|
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
|
return
|
|
of ConnEventKind.Disconnected:
|
|
pm.peerStore.connectionBook[peerId] = CanConnect
|
|
if not pm.storage.isNil:
|
|
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
|
return
|
|
|
|
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
|
let pm = PeerManager(switch: switch,
|
|
peerStore: WakuPeerStore.new(),
|
|
storage: storage)
|
|
|
|
debug "creating new PeerManager"
|
|
|
|
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
|
onConnEvent(pm, peerId, event)
|
|
|
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
|
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
|
|
|
if not storage.isNil:
|
|
debug "found persistent peer storage"
|
|
pm.loadFromStorage() # Load previously managed peers.
|
|
else:
|
|
debug "no peer storage found"
|
|
|
|
return pm
|
|
|
|
#####################
|
|
# Manager interface #
|
|
#####################
|
|
|
|
proc peers*(pm: PeerManager): seq[StoredInfo] =
|
|
# Return the known info for all peers
|
|
pm.peerStore.peers()
|
|
|
|
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
|
|
# Return the known info for all peers registered on the specified protocol
|
|
pm.peers.filterIt(it.protos.contains(proto))
|
|
|
|
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
|
|
# Return the known info for all peers matching the provided protocolMatcher
|
|
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
|
|
|
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
|
|
# Return the connection state of the given, managed peer
|
|
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
|
# @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
|
|
|
let storedInfo = pm.peerStore.get(peerId)
|
|
|
|
if (storedInfo == StoredInfo()):
|
|
# Peer is not managed, therefore not connected
|
|
return NotConnected
|
|
else:
|
|
pm.peerStore.connectionBook[peerId]
|
|
|
|
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
|
|
# Returns `true` if peer is included in manager for the specified protocol
|
|
|
|
pm.peerStore.get(peerId).protos.contains(proto)
|
|
|
|
proc hasPeers*(pm: PeerManager, proto: string): bool =
|
|
# Returns `true` if manager has any peers for the specified protocol
|
|
pm.peers.anyIt(it.protos.contains(proto))
|
|
|
|
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
|
|
# Returns `true` if manager has any peers matching the protocolMatcher
|
|
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
|
|
|
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
|
# Adds peer to manager for the specified protocol
|
|
|
|
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
|
# Do not attempt to manage our unmanageable self
|
|
return
|
|
|
|
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
|
|
|
# ...known addresses
|
|
for multiaddr in remotePeerInfo.addrs:
|
|
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr)
|
|
|
|
# ...public key
|
|
var publicKey: PublicKey
|
|
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
|
|
|
pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey
|
|
|
|
# ...associated protocols
|
|
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto)
|
|
|
|
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
|
if not pm.storage.isNil:
|
|
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
|
|
|
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
|
# Selects the best peer for a given protocol
|
|
let peers = pm.peers.filterIt(it.protos.contains(proto))
|
|
|
|
if peers.len >= 1:
|
|
# @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
|
let peerStored = peers[0]
|
|
|
|
return some(peerStored.toRemotePeerInfo())
|
|
else:
|
|
return none(RemotePeerInfo)
|
|
|
|
proc reconnectPeers*(pm: PeerManager,
|
|
proto: string,
|
|
protocolMatcher: Matcher,
|
|
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
|
|
## Reconnect to peers registered for this protocol. This will update connectedness.
|
|
## Especially useful to resume connections from persistent storage after a restart.
|
|
|
|
debug "Reconnecting peers", proto=proto
|
|
|
|
for storedInfo in pm.peers(protocolMatcher):
|
|
# Check if peer is reachable.
|
|
if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect:
|
|
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
|
|
continue
|
|
|
|
# Respect optional backoff period where applicable.
|
|
let
|
|
disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert
|
|
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
|
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
|
|
|
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
|
|
|
|
if backoffTime > ZeroDuration:
|
|
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
|
|
# We disconnected recently and still need to wait for a backoff period before connecting
|
|
await sleepAsync(backoffTime)
|
|
|
|
# Add to protos for peer, if it has not been added yet
|
|
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
|
|
let remotePeerInfo = storedInfo.toRemotePeerInfo()
|
|
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
|
pm.addPeer(remotePeerInfo, proto)
|
|
|
|
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
|
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
|
|
|
####################
|
|
# Dialer interface #
|
|
####################
|
|
|
|
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
|
# Dial a given peer and add it to the list of known peers
|
|
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
|
|
|
|
# First add dialed peer info to peer store, if it does not exist yet...
|
|
if not pm.hasPeer(remotePeerInfo.peerId, proto):
|
|
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
|
pm.addPeer(remotePeerInfo, proto)
|
|
|
|
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
|
# Do not attempt to dial self
|
|
return none(Connection)
|
|
|
|
return await pm.dialPeer(remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout)
|
|
|
|
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
|
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
|
|
# @TODO check peer validity and score before continuing. Limit number of peers to be managed.
|
|
|
|
if peerId == pm.switch.peerInfo.peerId:
|
|
# Do not attempt to dial self
|
|
return none(Connection)
|
|
|
|
let addrs = pm.switch.peerStore[AddressBook][peerId]
|
|
|
|
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)
|
|
|
|
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
|
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
info "Connecting to node", remotePeer = remotePeer, source = source
|
|
|
|
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
let connOpt = await pm.dialPeer(remotePeer, proto)
|
|
|
|
if connOpt.isSome():
|
|
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
waku_node_conns_initiated.inc(labelValues = [source])
|
|
else:
|
|
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
|
waku_peers_errors.inc(labelValues = ["conn_init_failure"])
|
|
|
|
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
|
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
info "connectToNodes", len = nodes.len
|
|
|
|
for nodeId in nodes:
|
|
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
|
|
|
|
# The issue seems to be around peers not being fully connected when
|
|
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
|
# fully connected.
|
|
#
|
|
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
|
# later.
|
|
await sleepAsync(chronos.seconds(5))
|
|
|
|
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
|
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
info "connectToNodes", len = nodes.len
|
|
|
|
for remotePeerInfo in nodes:
|
|
await connectToNode(pm, remotePeerInfo, proto, source)
|
|
|
|
# The issue seems to be around peers not being fully connected when
|
|
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
|
# fully connected.
|
|
#
|
|
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
|
# later.
|
|
await sleepAsync(chronos.seconds(5))
|
|
|