feat: shard aware relay peer management (#2332)

note that this feature is behind a config flag. `--relay-shard-manager`
This commit is contained in:
Simon-Pierre Vivier 2024-01-30 07:28:21 -05:00 committed by GitHub
parent e04e35e229
commit edca1df1a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 490 additions and 195 deletions

View File

@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[strformat, strutils, times, json, options, random]
import std/[strformat, strutils, times, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,

View File

@ -54,7 +54,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
@ -294,7 +294,9 @@ proc initNode(conf: WakuNodeConf,
agentString = some(conf.agentString)
)
builder.withColocationLimit(conf.colocationLimit)
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)
builder.withPeerManagerConfig(
maxRelayPeers = conf.maxRelayPeers,
shardAware = conf.relayShardedPeerManagement,)
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

View File

@ -199,6 +199,11 @@ type
defaultValue: false
name: "relay-peer-exchange" }: bool
relayShardedPeerManagement* {.
desc: "Enable experimental shard aware peer manager for relay protocol: true|false",
defaultValue: false
name: "relay-shard-manager" }: bool
rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
defaultValue: false

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, sequtils, times],
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
@ -21,10 +21,12 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
@ -129,7 +131,6 @@ procSuite "Peer Manager":
await node.stop()
asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
@ -226,18 +227,34 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"
check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))
check:
@ -247,10 +264,17 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)
node3.mountMetadata(0).expect("Mounted Waku Metadata")
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
@ -258,8 +282,83 @@ procSuite "Peer Manager":
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()
await sleepAsync(chronos.milliseconds(500))
check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Sharded peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"
check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)
node3.mountMetadata(0).expect("Mounted Waku Metadata")
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
await node3.peerManager.manageRelayPeers()
await sleepAsync(chronos.milliseconds(500))
check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
@ -298,9 +397,9 @@ procSuite "Peer Manager":
topics = @["/waku/2/rs/4/0"],
)
discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
@ -319,7 +418,6 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome
# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
@ -378,14 +476,28 @@ procSuite "Peer Manager":
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)
# Start them
await allFutures(nodes.mapIt(it.start()))
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
@ -416,6 +528,60 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Sharded peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)
# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
# Get all peer infos
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])
# Connect to relay peers
await nodes[0].peerManager.manageRelayPeers()
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

View File

@ -11,6 +11,7 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,

View File

@ -4,16 +4,12 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
chronos
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

View File

@ -32,9 +32,10 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
topics: @[],
relay: true,
maxMessageSize: "1024 KiB"
maxMessageSize: "1024 KiB",
clusterId: 1.uint32,
topics: @["/waku/2/rs/1/0"],
relay: true
)
proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
@ -56,8 +57,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
clusterId: uint32 = 1.uint32,
topics: seq[string] = @["/waku/2/rs/1/0"],
peerStoreCapacity = none(int)): WakuNode =
var resolvedExtIp = extIp
@ -67,7 +68,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort
let conf = defaultTestWakuNodeConf()
var conf = defaultTestWakuNodeConf()
conf.clusterId = clusterId
conf.topics = topics
if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails

View File

@ -10,11 +10,10 @@ import
import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,

View File

@ -36,6 +36,7 @@ type
# Peer manager config
maxRelayPeers: Option[int]
colocationLimit: int
shardAware: bool
# Libp2p switch
switchMaxConnections: Option[int]
@ -105,8 +106,10 @@ proc withPeerStorage*(builder: var WakuNodeBuilder, peerStorage: PeerStorage, ca
builder.peerStorageCapacity = capacity
proc withPeerManagerConfig*(builder: var WakuNodeBuilder,
maxRelayPeers = none(int)) =
maxRelayPeers = none(int),
shardAware = false) =
builder.maxRelayPeers = maxRelayPeers
builder.shardAware = shardAware
proc withColocationLimit*(builder: var WakuNodeBuilder,
colocationLimit: int) =
@ -174,6 +177,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
storage = builder.peerStorage.get(nil),
maxRelayPeers = builder.maxRelayPeers,
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
)
var node: WakuNode

View File

@ -5,7 +5,7 @@ else:
import
std/[options, sets, tables, sequtils, times, strutils, math],
std/[options, sets, sequtils, times, strutils, math, random],
chronos,
chronicles,
metrics,
@ -18,6 +18,7 @@ import
../../waku_core,
../../waku_relay,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
@ -36,6 +37,8 @@ declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress "
logScope:
topics = "waku node peer_manager"
randomize()
const
# TODO: Make configurable
DefaultDialTimeout = chronos.seconds(10)
@ -50,10 +53,10 @@ const
BackoffFactor = 4
# Limit the amount of paralel dials
MaxParalelDials = 10
MaxParallelDials = 10
# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(15)
ConnectivityLoopInterval = chronos.minutes(1)
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(10)
@ -80,6 +83,7 @@ type
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool
shardedPeerManagement: bool # temp feature flag
proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
@ -116,22 +120,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
# Do not attempt to manage our unmanageable self
return
# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and
pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0:
# Peer already managed and ENR info is already saved
return
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
if remotePeerInfo.protocols.len > 0:
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols
if remotePeerInfo.enr.isSome():
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()
@ -162,24 +165,27 @@ proc connectRelay*(pm: PeerManager,
trace "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
var deadline = sleepAsync(dialTimeout)
var workfut = pm.switch.connect(peerId, peer.addrs)
var reasonFailed = ""
let workfut = pm.switch.connect(peerId, peer.addrs)
# Can't use catch: with .withTimeout() in this case
let res = catch: await workfut or deadline
try:
await workfut or deadline
if workfut.finished():
let reasonFailed =
if not workfut.finished():
await workfut.cancelAndWait()
"timed out"
elif res.isErr(): res.error.msg
else:
if not deadline.finished():
deadline.cancel()
await deadline.cancelAndWait()
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return true
else:
reasonFailed = "timed out"
await cancelAndWait(workfut)
except CatchableError as exc:
reasonFailed = "remote peer failed"
pm.peerStore[NumberFailedConnBook][peerId] = 0
return true
# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
@ -215,15 +221,15 @@ proc dialPeer(pm: PeerManager,
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
var reasonFailed = ""
try:
if (await dialFut.withTimeout(dialTimeout)):
let res = catch:
if await dialFut.withTimeout(dialTimeout):
return some(dialFut.read())
else:
reasonFailed = "timeout"
await cancelAndWait(dialFut)
except CatchableError as exc:
reasonFailed = "failed"
else: await cancelAndWait(dialFut)
let reasonFailed =
if res.isOk: "timed out"
else: res.error.msg
trace "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
@ -294,107 +300,109 @@ proc canBeConnected*(pm: PeerManager,
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
if now >= (lastFailed + backoff):
return true
return false
return now >= (lastFailed + backoff)
##################
# Initialisation #
##################
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if pm.switch.connManager.getConnections().hasKey(peerId):
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len != 0:
let observedAddr = conns[0].connection.observedAddr
let ip = observedAddr.get.getHostname()
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
return some(ip)
return none(string)
if not pm.switch.connManager.getConnections().hasKey(peerId):
return none(string)
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len == 0:
return none(string)
let obAddr = conns[0].connection.observedAddr.valueOr:
return none(string)
# TODO: think if circuit relay ips should be handled differently
return some(obAddr.getHostname())
# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard
of ConnEventKind.Connected:
#let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if pm.wakuMetadata.clusterId == 0:
return
let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec)
var reason: string
block guardClauses:
let conn = res.valueOr:
reason = "dial failed: " & error.msg
break guardClauses
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "waku metatdata request failed: " & error
break guardClauses
let clusterId = metadata.clusterId.valueOr:
reason = "empty cluster-id reported"
break guardClauses
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break guardClauses
if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
reason = "no shards in common"
break guardClauses
return
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.onPeerMetadata(peerId)
var direction: PeerDirection
var connectedness: Connectedness
if event.kind == PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
case event.kind:
of Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
var clusterOk = false
var reason = ""
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0:
block wakuMetadata:
var conn: Connection
try:
conn = await pm.switch.dial(peerId, WakuMetadataCodec)
except CatchableError:
reason = "waku metadata codec not supported: " & getCurrentExceptionMsg()
break wakuMetadata
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
# request metadata from connecting peer
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "failed waku metadata codec request"
break wakuMetadata
# does not report any clusterId
let clusterId = metadata.clusterId.valueOr:
reason = "empty clusterId reported"
break wakuMetadata
# drop it if it doesnt match our network id
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break wakuMetadata
# reaching here means the clusterId matches
clusterOk = true
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
# TODO: Take action depending on the supported shards as reported by metadata
let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
let peersBehindIp = pm.ipTable[ip.get]
# pm.colocationLimit == 0 disables the ip colocation limit
if pm.colocationLimit != 0 and
peersBehindIp.len > pm.colocationLimit:
# in theory this should always be one, but just in case
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
let peersBehindIp = pm.ipTable[ip.get]
# pm.colocationLimit == 0 disables the ip colocation limit
if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit:
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
of Left:
direction = UnknownDirection
connectedness = CanConnect
elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect
# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
@ -413,7 +421,8 @@ proc new*(T: type PeerManager,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,): PeerManager =
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false): PeerManager =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
@ -459,7 +468,8 @@ proc new*(T: type PeerManager,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,)
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
@ -604,9 +614,10 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))
# Returns the peerIds of physical connections (in and out)
# containing at least one stream with the given protocol.
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
@ -636,10 +647,14 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
return (numStreamsIn, numStreamsOut)
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
if amount <= 0:
return
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)
for p in inRelayPeers[0..<connsToPrune]:
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
@ -657,10 +672,86 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials)
let numPeersToConnect = min(outsideBackoffPeers.len, MaxParallelDials)
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
proc manageRelayPeers*(pm: PeerManager) {.async.} =
if pm.wakuMetadata.shards.len == 0:
return
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
var peersToDisconnect: int
# Get all connected peers for Waku Relay
var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec)
# Calculate in/out target number of peers for each shards
let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len
let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len
for shard in pm.wakuMetadata.shards.items:
# Filter out peer not on this shard
let connectedInPeers = inPeers.filterIt(
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
let connectedOutPeers = outPeers.filterIt(
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
# Calculate the difference between current values and targets
let inPeerDiff = connectedInPeers.len - inTarget
let outPeerDiff = outTarget - connectedOutPeers.len
if inPeerDiff > 0:
peersToDisconnect += inPeerDiff
if outPeerDiff <= 0:
continue
# Get all peers for this shard
var connectablePeers = pm.peerStore.getPeersByShard(
uint16(pm.wakuMetadata.clusterId), uint16(shard))
let shardCount = connectablePeers.len
connectablePeers.keepItIf(
not pm.peerStore.isConnected(it.peerId) and
pm.canBeConnected(it.peerId))
let connectableCount = connectablePeers.len
connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay))
let relayCount = connectablePeers.len
debug "Sharded Peer Management",
shard = shard,
connectable = $connectableCount & "/" & $shardCount,
relayConnectable = $relayCount & "/" & $shardCount,
relayInboundTarget = $connectedInPeers.len & "/" & $inTarget,
relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget
# Always pick random connectable relay peers
shuffle(connectablePeers)
let length = min(outPeerDiff, connectablePeers.len)
for peer in connectablePeers[0..<length]:
trace "Peer To Connect To", peerId = $peer.peerId
peersToConnect.incl(peer.peerId)
await pm.pruneInRelayConns(peersToDisconnect)
if peersToConnect.len == 0:
return
let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it))
# Connect to all nodes
for i in countup(0, uniquePeers.len, MaxParallelDials):
let stop = min(i + MaxParallelDials, uniquePeers.len)
trace "Connecting to Peers", peerIds = $uniquePeers[i..<stop]
await pm.connectToNodes(uniquePeers[i..<stop])
proc prunePeerStore*(pm: PeerManager) =
let numPeers = pm.peerStore[AddressBook].book.len
let capacity = pm.peerStore.capacity
@ -681,7 +772,10 @@ proc prunePeerStore*(pm: PeerManager) =
peersToPrune.incl(peerId)
let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
var notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
# Always pick random non-connected peers
shuffle(notConnected)
var shardlessPeers: seq[PeerId]
var peersByShard = initTable[uint16, seq[PeerId]]()
@ -775,7 +869,9 @@ proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
trace "Starting relay connectivity loop"
while pm.started:
await pm.connectToRelayPeers()
if pm.shardedPeerManagement:
await pm.manageRelayPeers()
else: await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)
proc logAndMetrics(pm: PeerManager) {.async.} =
@ -783,7 +879,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len
@ -811,4 +906,4 @@ proc start*(pm: PeerManager) =
asyncSpawn pm.logAndMetrics()
proc stop*(pm: PeerManager) =
pm.started = false
pm.started = false

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[tables, sequtils, sets, options, times, strutils],
std/[tables, sequtils, sets, options, strutils],
chronos,
eth/p2p/discoveryv5/enr,
libp2p/builders,
@ -12,6 +12,8 @@ import
import
../../waku_core,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../common/utils/sequence
export peerstore, builders
@ -95,10 +97,13 @@ proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo]
peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it)))
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)
proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)
proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
@ -131,3 +136,9 @@ proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInf
proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)
proc getPeersByShard*(peerStore: PeerStore, cluster, shard: uint16): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard))
proc getPeersByCapability*(peerStore: PeerStore, cap: Capabilities): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))

View File

@ -36,9 +36,10 @@ import
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_lightpush,
../waku_metadata,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_enr,
../waku_dnsdisc,
../waku_peer_exchange,

View File

@ -16,7 +16,7 @@ import
../../../waku_store,
../../../waku_filter,
../../../waku_filter_v2,
../../../waku_lightpush,
../../../waku_lightpush/common,
../../../waku_relay,
../../../waku_node,
../../../node/peer_manager,

View File

@ -5,7 +5,6 @@ else:
import
std/strformat,
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
@ -14,10 +13,9 @@ import
presto/common
import
../../../waku_core,
../../waku/node/peer_manager,
../../../waku_node,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../handlers,
../serdes,
../responses,

View File

@ -84,7 +84,6 @@ proc init*(T: typedesc[RemotePeerInfo],
let peerId = PeerID.init(peerId).tryGet()
RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols)
## Parse
proc validWireAddr*(ma: MultiAddress): bool =
@ -217,11 +216,15 @@ converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections
RemotePeerInfo.init(
peerInfo.peerId,
peerInfo.listenAddrs,
none(enr.Record),
peerInfo.protocols
RemotePeerInfo(
peerId: peerInfo.peerId,
addrs: peerInfo.listenAddrs,
enr: none(Record),
protocols: peerInfo.protocols,
agent: peerInfo.agentVersion,
protoVersion: peerInfo.protoVersion,
publicKey: peerInfo.publicKey,
)
proc hasProtocol*(ma: MultiAddress, proto: string): bool =

View File

@ -14,7 +14,7 @@ import
../node/peer_manager,
../utils/requests,
../waku_core,
./protocol,
./common,
./protocol_metrics,
./rpc,
./rpc_codec

View File

@ -0,0 +1,21 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/results,
chronos,
libp2p/peerid
import
../waku_core
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
type WakuLightPushResult*[T] = Result[T, string]
type PushMessageHandler* = proc(
peer: PeerId,
pubsubTopic: PubsubTopic,
message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}

View File

@ -12,26 +12,17 @@ import
metrics,
bearssl/rand
import
../node/peer_manager,
../node/peer_manager/peer_manager,
../waku_core,
./common,
./rpc,
./rpc_codec,
./protocol_metrics
logScope:
topics = "waku lightpush"
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
type
WakuLightPushResult*[T] = Result[T, string]
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async, closure.}
WakuLightPush* = ref object of LPProtocol
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler

View File

@ -112,7 +112,7 @@ proc new*(T: type WakuMetadata,
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=cluster
info "Created WakuMetadata protocol", clusterId=wm.clusterId, shards=wm.shards
return wm