mirror of https://github.com/waku-org/nwaku.git
feat: shard aware peer management (#2151)
This commit is contained in:
parent
42f1957920
commit
dba9820c1f
|
@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[strformat, strutils, times, json, options, random]
|
||||
import std/[strformat, strutils, times, options, random]
|
||||
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
||||
eth/keys, bearssl, stew/[byteutils, results],
|
||||
nimcrypto/pbkdf2,
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||
|
@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi
|
|||
peerinfo, # manage the information of a peer, such as peer ID and public / private key
|
||||
peerid, # Implement how peers interact
|
||||
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
|
||||
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
|
||||
nameresolving/dnsresolver]# define DNS resolution
|
||||
import
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_lightpush/rpc,
|
||||
../../waku/waku_filter,
|
||||
../../waku/waku_enr,
|
||||
|
|
|
@ -53,7 +53,7 @@ import
|
|||
../../waku/waku_peer_exchange,
|
||||
../../waku/waku_rln_relay,
|
||||
../../waku/waku_store,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_filter,
|
||||
../../waku/waku_filter_v2,
|
||||
./wakunode2_validator_signed,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils, times],
|
||||
std/[options, sequtils, times, sugar],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
|
@ -21,10 +21,12 @@ import
|
|||
../../waku/node/peer_manager/peer_manager,
|
||||
../../waku/node/peer_manager/peer_store/waku_peer_storage,
|
||||
../../waku/waku_node,
|
||||
../../waku/waku_relay,
|
||||
../../waku/waku_store,
|
||||
../../waku/waku_filter,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_enr/capabilities,
|
||||
../../waku/waku_relay/protocol,
|
||||
../../waku/waku_store/common,
|
||||
../../waku/waku_filter/protocol,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_peer_exchange,
|
||||
../../waku/waku_metadata,
|
||||
./testlib/common,
|
||||
|
@ -128,7 +130,6 @@ procSuite "Peer Manager":
|
|||
|
||||
await node.stop()
|
||||
|
||||
|
||||
asyncTest "Peer manager keeps track of connections":
|
||||
# Create 2 nodes
|
||||
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||
|
@ -225,18 +226,34 @@ procSuite "Peer Manager":
|
|||
let
|
||||
database = SqliteDatabase.new(":memory:")[]
|
||||
storage = WakuPeerStorage.new(database)[]
|
||||
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
|
||||
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
node1 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
ValidIpAddress.init("127.0.0.1"),
|
||||
Port(44048),
|
||||
peerStorage = storage
|
||||
)
|
||||
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))
|
||||
|
||||
node1.mountMetadata(0).expect("Mounted Waku Metadata")
|
||||
node2.mountMetadata(0).expect("Mounted Waku Metadata")
|
||||
|
||||
await node1.start()
|
||||
await node2.start()
|
||||
|
||||
await node1.mountRelay()
|
||||
await node2.mountRelay()
|
||||
|
||||
let peerInfo2 = node2.switch.peerInfo
|
||||
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
|
||||
remotePeerInfo2.enr = some(node2.enr)
|
||||
|
||||
require:
|
||||
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
|
||||
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
|
||||
assert is12Connected == true, "Node 1 and 2 not connected"
|
||||
|
||||
check:
|
||||
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
|
||||
|
||||
# wait for the peer store update
|
||||
await sleepAsync(chronos.milliseconds(500))
|
||||
|
||||
check:
|
||||
|
@ -246,10 +263,17 @@ procSuite "Peer Manager":
|
|||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
|
||||
let node3 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
ValidIpAddress.init("127.0.0.1"),
|
||||
Port(56037),
|
||||
peerStorage = storage
|
||||
)
|
||||
|
||||
node3.mountMetadata(0).expect("Mounted Waku Metadata")
|
||||
|
||||
await node3.start()
|
||||
|
||||
check:
|
||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
|
@ -257,7 +281,10 @@ procSuite "Peer Manager":
|
|||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
await node3.mountRelay()
|
||||
await node3.peerManager.connectToRelayPeers()
|
||||
|
||||
await node3.peerManager.manageRelayPeers()
|
||||
|
||||
await sleepAsync(chronos.milliseconds(500))
|
||||
|
||||
check:
|
||||
# Reconnected to node2 after "restart"
|
||||
|
@ -297,9 +324,9 @@ procSuite "Peer Manager":
|
|||
topics = @["/waku/2/rs/4/0"],
|
||||
)
|
||||
|
||||
discard node1.mountMetadata(clusterId3)
|
||||
discard node2.mountMetadata(clusterId4)
|
||||
discard node3.mountMetadata(clusterId4)
|
||||
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
|
||||
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
|
||||
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
|
||||
|
||||
# Start nodes
|
||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
|
@ -318,7 +345,6 @@ procSuite "Peer Manager":
|
|||
conn2.isNone
|
||||
conn3.isSome
|
||||
|
||||
|
||||
# TODO: nwaku/issues/1377
|
||||
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
|
||||
let
|
||||
|
@ -377,14 +403,28 @@ procSuite "Peer Manager":
|
|||
|
||||
asyncTest "Peer manager connects to all peers supporting a given protocol":
|
||||
# Create 4 nodes
|
||||
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
||||
let nodes =
|
||||
toSeq(0..<4)
|
||||
.mapIt(
|
||||
newTestWakuNode(
|
||||
nodeKey = generateSecp256k1Key(),
|
||||
bindIp = ValidIpAddress.init("0.0.0.0"),
|
||||
bindPort = Port(0),
|
||||
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
|
||||
)
|
||||
)
|
||||
|
||||
# Start them
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
discard nodes.mapIt(it.mountMetadata(0))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
|
||||
# Get all peer infos
|
||||
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
let peerInfos = collect:
|
||||
for i in 0..nodes.high:
|
||||
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
|
||||
peerInfo.enr = some(nodes[i].enr)
|
||||
peerInfo
|
||||
|
||||
# Add all peers (but self) to node 0
|
||||
nodes[0].peerManager.addPeer(peerInfos[1])
|
||||
|
@ -392,7 +432,7 @@ procSuite "Peer Manager":
|
|||
nodes[0].peerManager.addPeer(peerInfos[3])
|
||||
|
||||
# Connect to relay peers
|
||||
await nodes[0].peerManager.connectToRelayPeers()
|
||||
await nodes[0].peerManager.manageRelayPeers()
|
||||
|
||||
check:
|
||||
# Peerstore track all three peers
|
||||
|
|
|
@ -11,6 +11,7 @@ import
|
|||
../../waku/node/peer_manager,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_lightpush/client,
|
||||
../../waku/waku_lightpush/protocol_metrics,
|
||||
../../waku/waku_lightpush/rpc,
|
||||
|
|
|
@ -4,16 +4,12 @@ import
|
|||
std/options,
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/switch
|
||||
chronos
|
||||
import
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_node,
|
||||
./testlib/common,
|
||||
./testlib/wakucore,
|
||||
./testlib/wakunode
|
||||
|
||||
|
|
|
@ -32,7 +32,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
|
|||
dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")],
|
||||
nat: "any",
|
||||
maxConnections: 50,
|
||||
topics: @[],
|
||||
clusterId: 1.uint32,
|
||||
topics: @["/waku/2/rs/1/0"],
|
||||
relay: true
|
||||
)
|
||||
|
||||
|
@ -55,8 +56,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
|
|||
dns4DomainName = none(string),
|
||||
discv5UdpPort = none(Port),
|
||||
agentString = none(string),
|
||||
clusterId: uint32 = 2.uint32,
|
||||
topics: seq[string] = @["/waku/2/rs/2/0"],
|
||||
clusterId: uint32 = 1.uint32,
|
||||
topics: seq[string] = @["/waku/2/rs/1/0"],
|
||||
peerStoreCapacity = none(int)): WakuNode =
|
||||
|
||||
var resolvedExtIp = extIp
|
||||
|
@ -66,7 +67,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
|
|||
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
|
||||
else: extPort
|
||||
|
||||
let conf = defaultTestWakuNodeConf()
|
||||
var conf = defaultTestWakuNodeConf()
|
||||
|
||||
conf.clusterId = clusterId
|
||||
conf.topics = topics
|
||||
|
||||
if dns4DomainName.isSome() and extIp.isNone():
|
||||
# If there's an error resolving the IP, an exception is thrown and test fails
|
||||
|
|
|
@ -10,11 +10,10 @@ import
|
|||
|
||||
import
|
||||
../../waku/waku_api/message_cache,
|
||||
../../waku/common/base64,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_node,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_api/rest/server,
|
||||
../../waku/waku_api/rest/client,
|
||||
../../waku/waku_api/rest/responses,
|
||||
|
|
|
@ -5,7 +5,7 @@ else:
|
|||
|
||||
|
||||
import
|
||||
std/[options, sets, sequtils, times, strutils, math],
|
||||
std/[options, sugar, sets, sequtils, times, strutils, math],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
|
@ -17,6 +17,10 @@ import
|
|||
../../waku_core,
|
||||
../../waku_relay,
|
||||
../../waku_enr/sharding,
|
||||
../../waku_enr/capabilities,
|
||||
../../waku_store/common,
|
||||
../../waku_filter_v2/common,
|
||||
../../waku_lightpush/common,
|
||||
../../waku_metadata,
|
||||
./peer_store/peer_storage,
|
||||
./waku_peer_store
|
||||
|
@ -49,10 +53,10 @@ const
|
|||
BackoffFactor = 4
|
||||
|
||||
# Limit the amount of paralel dials
|
||||
MaxParalelDials = 10
|
||||
MaxParallelDials = 10
|
||||
|
||||
# Delay between consecutive relayConnectivityLoop runs
|
||||
ConnectivityLoopInterval = chronos.seconds(15)
|
||||
ConnectivityLoopInterval = chronos.minutes(1)
|
||||
|
||||
# How often the peer store is pruned
|
||||
PrunePeerStoreInterval = chronos.minutes(10)
|
||||
|
@ -115,22 +119,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
|
|||
# Do not attempt to manage our unmanageable self
|
||||
return
|
||||
|
||||
# ...public key
|
||||
var publicKey: PublicKey
|
||||
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
||||
|
||||
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and
|
||||
pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0:
|
||||
# Peer already managed and ENR info is already saved
|
||||
return
|
||||
|
||||
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
|
||||
|
||||
|
||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey
|
||||
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
|
||||
|
||||
|
||||
if remotePeerInfo.protocols.len > 0:
|
||||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols
|
||||
|
||||
if remotePeerInfo.enr.isSome():
|
||||
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()
|
||||
|
||||
|
@ -158,27 +161,31 @@ proc connectRelay*(pm: PeerManager,
|
|||
pm.addPeer(peer)
|
||||
|
||||
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
||||
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
|
||||
debug "Connecting to relay peer",
|
||||
wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
|
||||
|
||||
var deadline = sleepAsync(dialTimeout)
|
||||
var workfut = pm.switch.connect(peerId, peer.addrs)
|
||||
var reasonFailed = ""
|
||||
let workfut = pm.switch.connect(peerId, peer.addrs)
|
||||
|
||||
# Can't use catch: with .withTimeout() in this case
|
||||
let res = catch: await workfut or deadline
|
||||
|
||||
try:
|
||||
await workfut or deadline
|
||||
if workfut.finished():
|
||||
let reasonFailed =
|
||||
if not workfut.finished():
|
||||
await workfut.cancelAndWait()
|
||||
"timed out"
|
||||
elif res.isErr(): res.error.msg
|
||||
else:
|
||||
if not deadline.finished():
|
||||
deadline.cancel()
|
||||
await deadline.cancelAndWait()
|
||||
|
||||
waku_peers_dials.inc(labelValues = ["successful"])
|
||||
waku_node_conns_initiated.inc(labelValues = [source])
|
||||
pm.peerStore[NumberFailedConnBook][peerId] = 0
|
||||
return true
|
||||
else:
|
||||
reasonFailed = "timed out"
|
||||
await cancelAndWait(workfut)
|
||||
except CatchableError as exc:
|
||||
reasonFailed = "remote peer failed"
|
||||
|
||||
pm.peerStore[NumberFailedConnBook][peerId] = 0
|
||||
|
||||
return true
|
||||
|
||||
# Dial failed
|
||||
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
|
||||
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
|
||||
|
@ -214,15 +221,15 @@ proc dialPeer(pm: PeerManager,
|
|||
|
||||
# Dial Peer
|
||||
let dialFut = pm.switch.dial(peerId, addrs, proto)
|
||||
var reasonFailed = ""
|
||||
try:
|
||||
if (await dialFut.withTimeout(dialTimeout)):
|
||||
|
||||
let res = catch:
|
||||
if await dialFut.withTimeout(dialTimeout):
|
||||
return some(dialFut.read())
|
||||
else:
|
||||
reasonFailed = "timeout"
|
||||
await cancelAndWait(dialFut)
|
||||
except CatchableError as exc:
|
||||
reasonFailed = "failed"
|
||||
else: await cancelAndWait(dialFut)
|
||||
|
||||
let reasonFailed =
|
||||
if res.isOk: "timed out"
|
||||
else: res.error.msg
|
||||
|
||||
debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
|
||||
|
||||
|
@ -293,105 +300,108 @@ proc canBeConnected*(pm: PeerManager,
|
|||
let now = Moment.init(getTime().toUnix, Second)
|
||||
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
|
||||
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
|
||||
if now >= (lastFailed + backoff):
|
||||
return true
|
||||
return false
|
||||
|
||||
return now >= (lastFailed + backoff)
|
||||
|
||||
##################
|
||||
# Initialisation #
|
||||
##################
|
||||
|
||||
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
|
||||
if pm.switch.connManager.getConnections().hasKey(peerId):
|
||||
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
|
||||
if conns.len != 0:
|
||||
let observedAddr = conns[0].connection.observedAddr
|
||||
let ip = observedAddr.get.getHostname()
|
||||
if observedAddr.isSome:
|
||||
# TODO: think if circuit relay ips should be handled differently
|
||||
let ip = observedAddr.get.getHostname()
|
||||
return some(ip)
|
||||
return none(string)
|
||||
if not pm.switch.connManager.getConnections().hasKey(peerId):
|
||||
return none(string)
|
||||
|
||||
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
|
||||
if conns.len == 0:
|
||||
return none(string)
|
||||
|
||||
let obAddr = conns[0].connection.observedAddr.valueOr:
|
||||
return none(string)
|
||||
|
||||
# TODO: think if circuit relay ips should be handled differently
|
||||
|
||||
return some(obAddr.getHostname())
|
||||
|
||||
# called when a connection i) is created or ii) is closed
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
let direction = if event.incoming: Inbound else: Outbound
|
||||
discard
|
||||
of ConnEventKind.Disconnected:
|
||||
discard
|
||||
of ConnEventKind.Connected:
|
||||
#let direction = if event.incoming: Inbound else: Outbound
|
||||
discard
|
||||
of ConnEventKind.Disconnected:
|
||||
discard
|
||||
|
||||
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||
# To prevent metadata protocol from breaking prev nodes, by now we only
|
||||
# disconnect if the clusterid is specified.
|
||||
if pm.wakuMetadata.clusterId == 0:
|
||||
return
|
||||
|
||||
let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec)
|
||||
|
||||
var reason: string
|
||||
block guardClauses:
|
||||
let conn = res.valueOr:
|
||||
reason = "dial failed: " & error.msg
|
||||
break guardClauses
|
||||
|
||||
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
|
||||
reason = "waku metatdata request failed: " & error
|
||||
break guardClauses
|
||||
|
||||
let clusterId = metadata.clusterId.valueOr:
|
||||
reason = "empty cluster-id reported"
|
||||
break guardClauses
|
||||
|
||||
if pm.wakuMetadata.clusterId != clusterId:
|
||||
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
|
||||
break guardClauses
|
||||
|
||||
if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
|
||||
reason = "no shards in common"
|
||||
break guardClauses
|
||||
|
||||
return
|
||||
|
||||
info "disconnecting from peer", peerId=peerId, reason=reason
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
pm.peerStore.delete(peerId)
|
||||
|
||||
# called when a peer i) first connects to us ii) disconnects all connections from us
|
||||
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
|
||||
await pm.onPeerMetadata(peerId)
|
||||
|
||||
var direction: PeerDirection
|
||||
var connectedness: Connectedness
|
||||
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
direction = if event.initiator: Outbound else: Inbound
|
||||
connectedness = Connected
|
||||
case event.kind:
|
||||
of Joined:
|
||||
direction = if event.initiator: Outbound else: Inbound
|
||||
connectedness = Connected
|
||||
|
||||
var clusterOk = false
|
||||
var reason = ""
|
||||
# To prevent metadata protocol from breaking prev nodes, by now we only
|
||||
# disconnect if the clusterid is specified.
|
||||
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0:
|
||||
block wakuMetadata:
|
||||
var conn: Connection
|
||||
try:
|
||||
conn = await pm.switch.dial(peerId, WakuMetadataCodec)
|
||||
except CatchableError:
|
||||
reason = "waku metadata codec not supported: " & getCurrentExceptionMsg()
|
||||
break wakuMetadata
|
||||
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
|
||||
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
|
||||
|
||||
# request metadata from connecting peer
|
||||
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
|
||||
reason = "failed waku metadata codec request"
|
||||
break wakuMetadata
|
||||
|
||||
# does not report any clusterId
|
||||
let clusterId = metadata.clusterId.valueOr:
|
||||
reason = "empty clusterId reported"
|
||||
break wakuMetadata
|
||||
|
||||
# drop it if it doesnt match our network id
|
||||
if pm.wakuMetadata.clusterId != clusterId:
|
||||
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
|
||||
break wakuMetadata
|
||||
|
||||
# reaching here means the clusterId matches
|
||||
clusterOk = true
|
||||
|
||||
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
|
||||
info "disconnecting from peer", peerId=peerId, reason=reason
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
pm.peerStore.delete(peerId)
|
||||
|
||||
# TODO: Take action depending on the supported shards as reported by metadata
|
||||
|
||||
let ip = pm.getPeerIp(peerId)
|
||||
if ip.isSome:
|
||||
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
|
||||
|
||||
let peersBehindIp = pm.ipTable[ip.get]
|
||||
if peersBehindIp.len > pm.colocationLimit:
|
||||
# in theory this should always be one, but just in case
|
||||
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
|
||||
let peersBehindIp = pm.ipTable[ip.get]
|
||||
|
||||
let idx = max((peersBehindIp.len - pm.colocationLimit), 0)
|
||||
for peerId in peersBehindIp[0..<idx]:
|
||||
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
pm.peerStore.delete(peerId)
|
||||
of Left:
|
||||
direction = UnknownDirection
|
||||
connectedness = CanConnect
|
||||
|
||||
elif event.kind == PeerEventKind.Left:
|
||||
direction = UnknownDirection
|
||||
connectedness = CanConnect
|
||||
|
||||
# note we cant access the peerId ip here as the connection was already closed
|
||||
for ip, peerIds in pm.ipTable.pairs:
|
||||
if peerIds.contains(peerId):
|
||||
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
|
||||
if pm.ipTable[ip].len == 0:
|
||||
pm.ipTable.del(ip)
|
||||
break
|
||||
# note we cant access the peerId ip here as the connection was already closed
|
||||
for ip, peerIds in pm.ipTable.pairs:
|
||||
if peerIds.contains(peerId):
|
||||
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
|
||||
if pm.ipTable[ip].len == 0:
|
||||
pm.ipTable.del(ip)
|
||||
break
|
||||
|
||||
pm.peerStore[ConnectionBook][peerId] = connectedness
|
||||
pm.peerStore[DirectionBook][peerId] = direction
|
||||
|
@ -601,9 +611,10 @@ proc connectToNodes*(pm: PeerManager,
|
|||
# later.
|
||||
await sleepAsync(chronos.seconds(5))
|
||||
|
||||
# Returns the peerIds of physical connections (in and out)
|
||||
# containing at least one stream with the given protocol.
|
||||
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
|
||||
## Returns the peerIds of physical connections (in and out)
|
||||
## containing at least one stream with the given protocol.
|
||||
|
||||
var inPeers: seq[PeerId]
|
||||
var outPeers: seq[PeerId]
|
||||
|
||||
|
@ -633,30 +644,88 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
|
|||
return (numStreamsIn, numStreamsOut)
|
||||
|
||||
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
if amount <= 0:
|
||||
return
|
||||
|
||||
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
|
||||
let connsToPrune = min(amount, inRelayPeers.len)
|
||||
|
||||
for p in inRelayPeers[0..<connsToPrune]:
|
||||
trace "Pruning Peer", Peer = $p
|
||||
asyncSpawn(pm.switch.disconnect(p))
|
||||
|
||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
|
||||
proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
||||
if pm.wakuMetadata.shards.len == 0:
|
||||
return
|
||||
|
||||
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
|
||||
var peersToDisconnect: int
|
||||
|
||||
# TODO: Temporally disabled. Might be causing connection issues
|
||||
#if inRelayPeers.len > pm.inRelayPeersTarget:
|
||||
# await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
|
||||
# Get all connected peers for Waku Relay
|
||||
var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
|
||||
if outRelayPeers.len >= pm.outRelayPeersTarget:
|
||||
# Calculate in/out target number of peers for each shards
|
||||
let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len
|
||||
let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len
|
||||
|
||||
for shard in pm.wakuMetadata.shards.items:
|
||||
# Filter out peer not on this shard
|
||||
let connectedInPeers = inPeers.filterIt(
|
||||
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
|
||||
|
||||
let connectedOutPeers = outPeers.filterIt(
|
||||
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
|
||||
|
||||
# Calculate the difference between current values and targets
|
||||
let inPeerDiff = connectedInPeers.len - inTarget
|
||||
let outPeerDiff = outTarget - connectedOutPeers.len
|
||||
|
||||
if inPeerDiff > 0:
|
||||
peersToDisconnect += inPeerDiff
|
||||
|
||||
if outPeerDiff <= 0:
|
||||
continue
|
||||
|
||||
# Get all peers for this shard
|
||||
var connectablePeers = pm.peerStore.getPeersByShard(
|
||||
uint16(pm.wakuMetadata.clusterId), uint16(shard))
|
||||
|
||||
let shardCount = connectablePeers.len
|
||||
|
||||
connectablePeers.keepItIf(
|
||||
not pm.peerStore.isConnected(it.peerId) and
|
||||
pm.canBeConnected(it.peerId))
|
||||
|
||||
let connectableCount = connectablePeers.len
|
||||
|
||||
connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay))
|
||||
|
||||
let relayCount = connectablePeers.len
|
||||
|
||||
debug "Sharded Peer Management",
|
||||
shard = shard,
|
||||
connectable = $connectableCount & "/" & $shardCount,
|
||||
relayConnectable = $relayCount & "/" & $shardCount,
|
||||
relayInboundTarget = $connectedInPeers.len & "/" & $inTarget,
|
||||
relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget
|
||||
|
||||
let length = min(outPeerDiff, connectablePeers.len)
|
||||
for peer in connectablePeers[0..<length]:
|
||||
trace "Peer To Connect To", peerId = $peer.peerId
|
||||
peersToConnect.incl(peer.peerId)
|
||||
|
||||
await pm.pruneInRelayConns(peersToDisconnect)
|
||||
|
||||
if peersToConnect.len == 0:
|
||||
return
|
||||
|
||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials)
|
||||
let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it))
|
||||
|
||||
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
|
||||
# Connect to all nodes
|
||||
for i in countup(0, uniquePeers.len, MaxParallelDials):
|
||||
let stop = min(i + MaxParallelDials, uniquePeers.len)
|
||||
trace "Connecting to Peers", peerIds = $uniquePeers[i..<stop]
|
||||
await pm.connectToNodes(uniquePeers[i..<stop])
|
||||
|
||||
proc prunePeerStore*(pm: PeerManager) =
|
||||
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
||||
|
@ -733,7 +802,7 @@ proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
|
|||
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
|
||||
debug "Starting relay connectivity loop"
|
||||
while pm.started:
|
||||
await pm.connectToRelayPeers()
|
||||
await pm.manageRelayPeers()
|
||||
await sleepAsync(ConnectivityLoopInterval)
|
||||
|
||||
proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
|
@ -741,7 +810,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
|||
# log metrics
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
let totalConnections = pm.switch.connManager.getConnections().len
|
||||
|
@ -769,4 +837,4 @@ proc start*(pm: PeerManager) =
|
|||
asyncSpawn pm.logAndMetrics()
|
||||
|
||||
proc stop*(pm: PeerManager) =
|
||||
pm.started = false
|
||||
pm.started = false
|
|
@ -4,7 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, sets, options, times, strutils],
|
||||
std/[tables, sequtils, sets, options, strutils],
|
||||
chronos,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/builders,
|
||||
|
@ -12,6 +12,8 @@ import
|
|||
|
||||
import
|
||||
../../waku_core,
|
||||
../../waku_enr/sharding,
|
||||
../../waku_enr/capabilities,
|
||||
../../common/utils/sequence
|
||||
|
||||
export peerstore, builders
|
||||
|
@ -95,10 +97,13 @@ proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo]
|
|||
peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
|
||||
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
|
||||
|
||||
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
|
||||
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)
|
||||
|
||||
proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
|
||||
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)
|
||||
|
||||
proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
|
||||
# Returns `true` if the peer is connected
|
||||
|
@ -131,3 +136,9 @@ proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInf
|
|||
|
||||
proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)
|
||||
|
||||
proc getPeersByShard*(peerStore: PeerStore, cluster, shard: uint16): seq[RemotePeerInfo] =
|
||||
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard))
|
||||
|
||||
proc getPeersByCapability*(peerStore: PeerStore, cap: Capabilities): seq[RemotePeerInfo] =
|
||||
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))
|
|
@ -35,7 +35,8 @@ import
|
|||
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
|
||||
../waku_filter_v2,
|
||||
../waku_filter_v2/client as filter_client,
|
||||
../waku_lightpush,
|
||||
../waku_lightpush/common,
|
||||
../waku_lightpush/protocol,
|
||||
../waku_metadata,
|
||||
../waku_lightpush/client as lightpush_client,
|
||||
../waku_enr,
|
||||
|
|
|
@ -17,7 +17,7 @@ import
|
|||
../../../waku_store,
|
||||
../../../waku_filter,
|
||||
../../../waku_filter_v2,
|
||||
../../../waku_lightpush,
|
||||
../../../waku_lightpush/common,
|
||||
../../../waku_relay,
|
||||
../../../waku_node,
|
||||
../../../node/peer_manager,
|
||||
|
|
|
@ -5,7 +5,6 @@ else:
|
|||
|
||||
import
|
||||
std/strformat,
|
||||
std/sequtils,
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
|
@ -14,10 +13,9 @@ import
|
|||
presto/common
|
||||
|
||||
import
|
||||
../../../waku_core,
|
||||
../../waku/node/peer_manager,
|
||||
../../../waku_node,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../handlers,
|
||||
../serdes,
|
||||
../responses,
|
||||
|
|
|
@ -84,7 +84,6 @@ proc init*(T: typedesc[RemotePeerInfo],
|
|||
let peerId = PeerID.init(peerId).tryGet()
|
||||
RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols)
|
||||
|
||||
|
||||
## Parse
|
||||
|
||||
proc validWireAddr*(ma: MultiAddress): bool =
|
||||
|
@ -217,11 +216,15 @@ converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
|
|||
converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
|
||||
## Converts the local peerInfo to dialable RemotePeerInfo
|
||||
## Useful for testing or internal connections
|
||||
RemotePeerInfo.init(
|
||||
peerInfo.peerId,
|
||||
peerInfo.listenAddrs,
|
||||
none(enr.Record),
|
||||
peerInfo.protocols
|
||||
RemotePeerInfo(
|
||||
peerId: peerInfo.peerId,
|
||||
addrs: peerInfo.listenAddrs,
|
||||
enr: none(Record),
|
||||
protocols: peerInfo.protocols,
|
||||
|
||||
agent: peerInfo.agentVersion,
|
||||
protoVersion: peerInfo.protoVersion,
|
||||
publicKey: peerInfo.publicKey,
|
||||
)
|
||||
|
||||
proc hasProtocol*(ma: MultiAddress, proto: string): bool =
|
||||
|
|
|
@ -14,7 +14,7 @@ import
|
|||
../node/peer_manager,
|
||||
../utils/requests,
|
||||
../waku_core,
|
||||
./protocol,
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
chronos,
|
||||
libp2p/peerid
|
||||
import
|
||||
../waku_core
|
||||
|
||||
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
type WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
type PushMessageHandler* = proc(
|
||||
peer: PeerId,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
|
@ -11,26 +11,17 @@ import
|
|||
metrics,
|
||||
bearssl/rand
|
||||
import
|
||||
../node/peer_manager,
|
||||
../node/peer_manager/peer_manager,
|
||||
../waku_core,
|
||||
./common,
|
||||
./rpc,
|
||||
./rpc_codec,
|
||||
./protocol_metrics
|
||||
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "waku lightpush"
|
||||
|
||||
|
||||
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
|
||||
|
||||
type
|
||||
WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
type WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
pushHandler*: PushMessageHandler
|
||||
|
|
|
@ -112,7 +112,7 @@ proc new*(T: type WakuMetadata,
|
|||
|
||||
wm.initProtocolHandler()
|
||||
|
||||
info "Created WakuMetadata protocol", clusterId=cluster
|
||||
info "Created WakuMetadata protocol", clusterId=wm.clusterId, shards=wm.shards
|
||||
|
||||
return wm
|
||||
|
||||
|
|
Loading…
Reference in New Issue