fix: Revert "feat: shard aware peer management (#2151)" (#2312)

This reverts commit dba9820c1fa00f414f18d57f7a3ff38b67d2bb1a.

We need to revert this commit because
the waku-simulator stopped working. i.e. the nodes couldn't establish
connections among them: 054ba9e33f

Also, the following js-waku test fails due to this commit:
"same cluster, different shard: nodes connect"

* waku_lightpush/protocol.nim: minor changes to make it compile after revert
This commit is contained in:
Ivan FB 2023-12-20 15:23:41 +01:00 committed by GitHub
parent a1b27edf80
commit 32668f43f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 215 additions and 343 deletions

View File

@ -9,9 +9,10 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[strformat, strutils, times, options, random]
import std/[strformat, strutils, times, json, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
@ -21,10 +22,11 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,

View File

@ -53,7 +53,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, sequtils, times, sugar],
std/[options, sequtils, times],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
@ -21,12 +21,10 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
@ -37,7 +35,7 @@ import
procSuite "Peer Manager":
asyncTest "connectRelay() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
@ -50,7 +48,7 @@ procSuite "Peer Manager":
asyncTest "dialPeer() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
@ -78,7 +76,7 @@ procSuite "Peer Manager":
asyncTest "dialPeer() fails gracefully":
# Create 2 nodes and start them
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
@ -101,7 +99,7 @@ procSuite "Peer Manager":
asyncTest "Adding, selecting and filtering peers work":
let
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
@ -130,9 +128,10 @@ procSuite "Peer Manager":
await node.stop()
asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
@ -176,7 +175,7 @@ procSuite "Peer Manager":
asyncTest "Peer manager updates failed peers correctly":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
@ -226,34 +225,18 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"
check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
# wait for the peer store update
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))
check:
@ -263,17 +246,10 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("127.0.0.1"),
Port(56037),
peerStorage = storage
)
node3.mountMetadata(0).expect("Mounted Waku Metadata")
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
@ -281,10 +257,7 @@ procSuite "Peer Manager":
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
await node3.peerManager.manageRelayPeers()
await sleepAsync(chronos.milliseconds(500))
await node3.peerManager.connectToRelayPeers()
check:
# Reconnected to node2 after "restart"
@ -302,7 +275,7 @@ procSuite "Peer Manager":
# different network
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId3,
topics = @["/waku/2/rs/3/0"],
@ -311,22 +284,22 @@ procSuite "Peer Manager":
# same network
node2 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
@ -345,13 +318,14 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome
# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
@ -375,7 +349,7 @@ procSuite "Peer Manager":
# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
await node3.mountRelay()
node3.wakuRelay.codec = stableCodec
@ -403,28 +377,14 @@ procSuite "Peer Manager":
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = parseIpAddress("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Get all peer infos
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
@ -432,7 +392,7 @@ procSuite "Peer Manager":
nodes[0].peerManager.addPeer(peerInfos[3])
# Connect to relay peers
await nodes[0].peerManager.manageRelayPeers()
await nodes[0].peerManager.connectToRelayPeers()
check:
# Peerstore track all three peers
@ -457,7 +417,7 @@ procSuite "Peer Manager":
asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them
await allFutures(nodes.mapIt(it.start()))
@ -520,7 +480,7 @@ procSuite "Peer Manager":
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
let
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peers = toSeq(1..5)
.mapIt(
parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)
@ -562,7 +522,7 @@ procSuite "Peer Manager":
asyncTest "connectedPeers() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
@ -613,7 +573,7 @@ procSuite "Peer Manager":
asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
@ -839,7 +799,7 @@ procSuite "Peer Manager":
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))

View File

@ -11,7 +11,6 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,

View File

@ -4,12 +4,16 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronos
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
import
../../waku/waku_core,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

View File

@ -32,8 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
clusterId: 1.uint32,
topics: @["/waku/2/rs/1/0"],
topics: @[],
relay: true
)
@ -56,8 +55,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 1.uint32,
topics: seq[string] = @["/waku/2/rs/1/0"],
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
peerStoreCapacity = none(int)): WakuNode =
var resolvedExtIp = extIp
@ -67,10 +66,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort
var conf = defaultTestWakuNodeConf()
conf.clusterId = clusterId
conf.topics = topics
let conf = defaultTestWakuNodeConf()
if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails

View File

@ -10,10 +10,11 @@ import
import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,

View File

@ -18,7 +18,6 @@ import
../../waku_core,
../../waku_relay,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
@ -51,7 +50,7 @@ const
BackoffFactor = 4
# Limit the amount of paralel dials
MaxParallelDials = 10
MaxParalelDials = 10
# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(15)
@ -117,21 +116,22 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
# Do not attempt to manage our unmanageable self
return
# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and
pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0:
# Peer already managed and ENR info is already saved
return
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
if remotePeerInfo.protocols.len > 0:
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols
if remotePeerInfo.enr.isSome():
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()
@ -159,31 +159,27 @@ proc connectRelay*(pm: PeerManager,
pm.addPeer(peer)
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
trace "Connecting to relay peer",
wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
trace "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
var deadline = sleepAsync(dialTimeout)
let workfut = pm.switch.connect(peerId, peer.addrs)
# Can't use catch: with .withTimeout() in this case
let res = catch: await workfut or deadline
var workfut = pm.switch.connect(peerId, peer.addrs)
var reasonFailed = ""
let reasonFailed =
if not workfut.finished():
await workfut.cancelAndWait()
"timed out"
elif res.isErr(): res.error.msg
else:
try:
await workfut or deadline
if workfut.finished():
if not deadline.finished():
await deadline.cancelAndWait()
deadline.cancel()
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return true
else:
reasonFailed = "timed out"
await cancelAndWait(workfut)
except CatchableError as exc:
reasonFailed = "remote peer failed"
# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
@ -219,15 +215,15 @@ proc dialPeer(pm: PeerManager,
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
let res = catch:
if await dialFut.withTimeout(dialTimeout):
var reasonFailed = ""
try:
if (await dialFut.withTimeout(dialTimeout)):
return some(dialFut.read())
else: await cancelAndWait(dialFut)
let reasonFailed =
if res.isOk: "timed out"
else: res.error.msg
else:
reasonFailed = "timeout"
await cancelAndWait(dialFut)
except CatchableError as exc:
reasonFailed = "failed"
trace "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto
@ -298,108 +294,105 @@ proc canBeConnected*(pm: PeerManager,
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
return now >= (lastFailed + backoff)
if now >= (lastFailed + backoff):
return true
return false
##################
# Initialisation #
##################
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if not pm.switch.connManager.getConnections().hasKey(peerId):
return none(string)
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len == 0:
return none(string)
let obAddr = conns[0].connection.observedAddr.valueOr:
return none(string)
# TODO: think if circuit relay ips should be handled differently
return some(obAddr.getHostname())
if pm.switch.connManager.getConnections().hasKey(peerId):
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len != 0:
let observedAddr = conns[0].connection.observedAddr
let ip = observedAddr.get.getHostname()
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
return some(ip)
return none(string)
# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
#let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if pm.wakuMetadata.clusterId == 0:
return
let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec)
var reason: string
block guardClauses:
let conn = res.valueOr:
reason = "dial failed: " & error.msg
break guardClauses
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "waku metatdata request failed: " & error
break guardClauses
let clusterId = metadata.clusterId.valueOr:
reason = "empty cluster-id reported"
break guardClauses
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break guardClauses
if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
reason = "no shards in common"
break guardClauses
return
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.onPeerMetadata(peerId)
var direction: PeerDirection
var connectedness: Connectedness
case event.kind:
of Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
if event.kind == PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
var clusterOk = false
var reason = ""
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0:
block wakuMetadata:
var conn: Connection
try:
conn = await pm.switch.dial(peerId, WakuMetadataCodec)
except CatchableError:
reason = "waku metadata codec not supported: " & getCurrentExceptionMsg()
break wakuMetadata
# request metadata from connecting peer
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "failed waku metadata codec request"
break wakuMetadata
# does not report any clusterId
let clusterId = metadata.clusterId.valueOr:
reason = "empty clusterId reported"
break wakuMetadata
# drop it if it doesnt match our network id
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break wakuMetadata
# reaching here means the clusterId matches
clusterOk = true
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
# TODO: Take action depending on the supported shards as reported by metadata
let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
let peersBehindIp = pm.ipTable[ip.get]
if peersBehindIp.len > pm.colocationLimit:
# in theory this should always be one, but just in case
let peersBehindIp = pm.ipTable[ip.get]
let idx = max((peersBehindIp.len - pm.colocationLimit), 0)
for peerId in peersBehindIp[0..<idx]:
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)
of Left:
direction = UnknownDirection
connectedness = CanConnect
# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect
# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
@ -609,10 +602,9 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))
# Returns the peerIds of physical connections (in and out)
# containing at least one stream with the given protocol.
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
@ -642,88 +634,30 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
return (numStreamsIn, numStreamsOut)
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
if amount <= 0:
return
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)
for p in inRelayPeers[0..<connsToPrune]:
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))
proc manageRelayPeers*(pm: PeerManager) {.async.} =
if pm.wakuMetadata.shards.len == 0:
return
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
var peersToDisconnect: int
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
# Get all connected peers for Waku Relay
var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec)
# TODO: Temporally disabled. Might be causing connection issues
#if inRelayPeers.len > pm.inRelayPeersTarget:
# await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
# Calculate in/out target number of peers for each shards
let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len
let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len
for shard in pm.wakuMetadata.shards.items:
# Filter out peer not on this shard
let connectedInPeers = inPeers.filterIt(
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
let connectedOutPeers = outPeers.filterIt(
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
# Calculate the difference between current values and targets
let inPeerDiff = connectedInPeers.len - inTarget
let outPeerDiff = outTarget - connectedOutPeers.len
if inPeerDiff > 0:
peersToDisconnect += inPeerDiff
if outPeerDiff <= 0:
continue
# Get all peers for this shard
var connectablePeers = pm.peerStore.getPeersByShard(
uint16(pm.wakuMetadata.clusterId), uint16(shard))
let shardCount = connectablePeers.len
connectablePeers.keepItIf(
not pm.peerStore.isConnected(it.peerId) and
pm.canBeConnected(it.peerId))
let connectableCount = connectablePeers.len
connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay))
let relayCount = connectablePeers.len
trace "Sharded Peer Management",
shard = shard,
connectable = $connectableCount & "/" & $shardCount,
relayConnectable = $relayCount & "/" & $shardCount,
relayInboundTarget = $connectedInPeers.len & "/" & $inTarget,
relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget
let length = min(outPeerDiff, connectablePeers.len)
for peer in connectablePeers[0..<length]:
trace "Peer To Connect To", peerId = $peer.peerId
peersToConnect.incl(peer.peerId)
await pm.pruneInRelayConns(peersToDisconnect)
if peersToConnect.len == 0:
if outRelayPeers.len >= pm.outRelayPeersTarget:
return
let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it))
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials)
# Connect to all nodes
for i in countup(0, uniquePeers.len, MaxParallelDials):
let stop = min(i + MaxParallelDials, uniquePeers.len)
trace "Connecting to Peers", peerIds = $uniquePeers[i..<stop]
await pm.connectToNodes(uniquePeers[i..<stop])
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
proc prunePeerStore*(pm: PeerManager) =
let numPeers = pm.peerStore[AddressBook].book.len
@ -839,7 +773,7 @@ proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
trace "Starting relay connectivity loop"
while pm.started:
await pm.manageRelayPeers()
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)
proc logAndMetrics(pm: PeerManager) {.async.} =
@ -847,6 +781,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len
@ -874,4 +809,4 @@ proc start*(pm: PeerManager) =
asyncSpawn pm.logAndMetrics()
proc stop*(pm: PeerManager) =
pm.started = false
pm.started = false

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[tables, sequtils, sets, options, strutils],
std/[tables, sequtils, sets, options, times, strutils],
chronos,
eth/p2p/discoveryv5/enr,
libp2p/builders,
@ -12,8 +12,6 @@ import
import
../../waku_core,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../common/utils/sequence
export peerstore, builders
@ -97,13 +95,10 @@ proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo]
peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it)))
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)
proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
@ -136,9 +131,3 @@ proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInf
proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)
proc getPeersByShard*(peerStore: PeerStore, cluster, shard: uint16): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard))
proc getPeersByCapability*(peerStore: PeerStore, cap: Capabilities): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))

View File

@ -35,8 +35,7 @@ import
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_lightpush,
../waku_metadata,
../waku_lightpush/client as lightpush_client,
../waku_enr,

View File

@ -17,7 +17,7 @@ import
../../../waku_store,
../../../waku_filter,
../../../waku_filter_v2,
../../../waku_lightpush/common,
../../../waku_lightpush,
../../../waku_relay,
../../../waku_node,
../../../node/peer_manager,

View File

@ -5,6 +5,7 @@ else:
import
std/strformat,
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
@ -13,9 +14,10 @@ import
presto/common
import
../../../waku_core,
../../waku/node/peer_manager,
../../../waku_node,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../handlers,
../serdes,
../responses,

View File

@ -84,6 +84,7 @@ proc init*(T: typedesc[RemotePeerInfo],
let peerId = PeerID.init(peerId).tryGet()
RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols)
## Parse
proc validWireAddr*(ma: MultiAddress): bool =
@ -216,15 +217,11 @@ converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections
RemotePeerInfo(
peerId: peerInfo.peerId,
addrs: peerInfo.listenAddrs,
enr: none(Record),
protocols: peerInfo.protocols,
agent: peerInfo.agentVersion,
protoVersion: peerInfo.protoVersion,
publicKey: peerInfo.publicKey,
RemotePeerInfo.init(
peerInfo.peerId,
peerInfo.listenAddrs,
none(enr.Record),
peerInfo.protocols
)
proc hasProtocol*(ma: MultiAddress, proto: string): bool =

View File

@ -14,7 +14,7 @@ import
../node/peer_manager,
../utils/requests,
../waku_core,
./common,
./protocol,
./protocol_metrics,
./rpc,
./rpc_codec

View File

@ -1,21 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/results,
chronos,
libp2p/peerid
import
../waku_core
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
type WakuLightPushResult*[T] = Result[T, string]
type PushMessageHandler* = proc(
peer: PeerId,
pubsubTopic: PubsubTopic,
message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}

View File

@ -11,20 +11,29 @@ import
metrics,
bearssl/rand
import
../node/peer_manager/peer_manager,
../node/peer_manager,
../waku_core,
./common,
./rpc,
./rpc_codec,
./protocol_metrics
logScope:
topics = "waku lightpush"
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
type
WakuLightPushResult*[T] = Result[T, string]
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async, closure.}
WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
proc handleRequest*(wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]): Future[PushRPC] {.async.} =
let reqDecodeRes = PushRPC.decode(buffer)

View File

@ -112,7 +112,7 @@ proc new*(T: type WakuMetadata,
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=wm.clusterId, shards=wm.shards
info "Created WakuMetadata protocol", clusterId=cluster
return wm