feat(networking): add service slots to peer manager (#1473)

This commit is contained in:
Alvaro Revuelta 2023-01-26 10:20:20 +01:00 committed by GitHub
parent 95d31b3ed3
commit ea4703e9a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 187 additions and 45 deletions

View File

@ -40,6 +40,9 @@ import
../../waku/v2/protocol/waku_archive/retention_policy,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/utils/peers,
../../waku/v2/utils/wakuenr,
@ -401,7 +404,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.storenode != "":
try:
mountStoreClient(node)
setStorePeer(node, conf.storenode)
let storenode = parseRemotePeerInfo(conf.storenode)
node.peerManager.addServicePeer(storenode, WakuStoreCodec)
except:
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
@ -415,7 +419,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.lightpushnode != "":
try:
mountLightPushClient(node)
setLightPushPeer(node, conf.lightpushnode)
let lightpushnode = parseRemotePeerInfo(conf.lightpushnode)
node.peerManager.addServicePeer(lightpushnode, WakuLightPushCodec)
except:
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
@ -429,7 +434,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.filternode != "":
try:
await mountFilterClient(node)
setFilterPeer(node, conf.filternode)
let filternode = parseRemotePeerInfo(conf.filternode)
node.peerManager.addServicePeer(filternode, WakuFilterCodec)
except:
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())
@ -442,7 +448,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.peerExchangeNode != "":
try:
setPeerExchangePeer(node, conf.peerExchangeNode)
let peerExchangeNode = parseRemotePeerInfo(conf.peerExchangeNode)
node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec)
except:
return err("failed to set node waku peer-exchange peer: " & getCurrentExceptionMsg())
@ -498,6 +505,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
if conf.keepAlive:
node.startKeepalive()
# Maintain relay connections
if conf.relay:
node.peerManager.start()
return ok()
when defined(waku_exp_store_resume):

View File

@ -15,7 +15,8 @@ import
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message
libp2p/protocols/pubsub/rpc/message,
libp2p/builders
import
../../waku/common/sqlite,
../../waku/v2/node/peer_manager/peer_manager,
@ -24,6 +25,8 @@ import
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers,
./testlib/testutils
@ -401,3 +404,93 @@ procSuite "Peer Manager":
nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer store addServicePeer() stores service peers":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60932))
peer1 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & "1")
peer2 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30301/p2p/" & basePeerId & "2")
peer3 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30302/p2p/" & basePeerId & "3")
peer4 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "4")
peer5 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "5")
# service peers
node.peerManager.addServicePeer(peer1, WakuStoreCodec)
node.peerManager.addServicePeer(peer2, WakuFilterCodec)
node.peerManager.addServicePeer(peer3, WakuLightPushCodec)
node.peerManager.addServicePeer(peer4, WakuPeerExchangeCodec)
# relay peers (should not be added)
node.peerManager.addServicePeer(peer5, WakuRelayCodec)
# all peers are stored in the peerstore
check:
node.peerManager.peerStore.peers().anyIt(it.peerId == peer1.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer2.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer3.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer4.peerId)
# but the relay peer is not
node.peerManager.peerStore.peers().anyIt(it.peerId == peer5.peerId) == false
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peer1.peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peer2.peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peer3.peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peer4.peerId
# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise().build(),
storage = nil)
# Create 3 peer infos
let peers = toSeq(1..3).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]
# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer1.isSome() == true
selectedPeer1.get().peerId == peers[0].peerId
# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer3.isSome() == false
# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLightPushCodec)
# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId
let selectedPeer5 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId

View File

@ -267,16 +267,6 @@ suite "Extended nim-libp2p Peer Store":
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))
test "selectPeer() returns if a peer supports a given protocol":
# When
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")
# Then
check:
swapPeer.isSome()
swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
test "getPeersByDirection()":
# When
let inPeers = peerStore.getPeersByDirection(Inbound)

View File

@ -13,7 +13,7 @@ import
../../utils/time,
../waku_node,
../peer_manager/peer_manager,
./jsonrpc_types,
./jsonrpc_types,
./jsonrpc_utils
export jsonrpc_types
@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")
@ -52,7 +52,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)")
let res = queryFut.read()
if res.isErr():
raise newException(ValueError, $res.error)

View File

@ -40,10 +40,10 @@ const
InitialBackoffInSec = 120
BackoffFactor = 4
# limit the amount of paralel dials
# Limit the amount of paralel dials
MaxParalelDials = 10
# delay between consecutive relayConnectivityLoop runs
# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)
type
@ -54,6 +54,8 @@ type
backoffFactor*: int
maxFailedAttempts*: int
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
started: bool
####################
# Helper functions #
@ -105,7 +107,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
debug "Dialing peer failed", peerId = peerId, reason = reasonFailed, failedAttempts=failedAttempts
debug "Dialing peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])
# Update storage
@ -192,13 +197,14 @@ proc new*(T: type PeerManager,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxFailedAttempts: maxFailedAttempts)
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
if not storage.isNil():
debug "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
@ -239,6 +245,20 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
warn "Can't add relay peer to service peers slots"
return
info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo
# TODO: Remove proto once fully refactored
pm.addPeer(remotePeerInfo, proto)
proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
@ -335,7 +355,8 @@ proc connectToNodes*(pm: PeerManager,
# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
while true:
debug "Starting relay connectivity loop"
while pm.started:
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
@ -364,3 +385,37 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
await sleepAsync(ConnectivityLoopInterval)
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto
# Selects the best peer for a given protocol
let peers = pm.peerStore.getPeersByProtocol(proto)
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)
# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)
proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
proc stop*(pm: PeerManager) =
pm.started = false

View File

@ -152,6 +152,10 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
peerStore.connectedness(peerId) == Connected
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
@ -165,20 +169,11 @@ proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
# Returns `true` if the peerstore has any peer matching the protocolMatcher
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = peerStore.peers().filterIt(it.protos.contains(proto))
if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)
proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] =
return peerStore.peers.filterIt(it.direction == direction)
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)
proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
return peerStore.peers.filterIt(it.protos.contains(proto))

View File

@ -391,9 +391,6 @@ proc startRelay*(node: WakuNode) {.async.} =
protocolMatcher(WakuRelayCodec),
backoffPeriod)
# Maintain relay connections
asyncSpawn node.peerManager.relayConnectivityLoop()
# Start the WakuRelay protocol
await node.wakuRelay.start()
@ -529,7 +526,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
@ -544,7 +541,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
@ -702,7 +699,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")
@ -791,7 +788,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
error "failed to publish message", error="waku lightpush client is nil"
return
let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec)
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers"
return
@ -1008,5 +1005,6 @@ proc stop*(node: WakuNode) {.async.} =
discard await node.stopDiscv5()
await node.switch.stop()
node.peerManager.stop()
node.started = false

View File

@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut
return ok()
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
return ok()
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)

View File

@ -210,7 +210,7 @@ when defined(waku_exp_store_resume):
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec)
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])