nwaku/tests/test_peer_manager.nim

1153 lines
39 KiB
Nim
Raw Normal View History

{.used.}
import
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
2022-11-04 09:52:08 +00:00
json_rpc/rpcserver,
json_rpc/rpcclient,
eth/keys,
eth/common/eth_types,
2021-06-09 14:37:08 +00:00
libp2p/[builders, switch, multiaddress],
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message,
libp2p/peerid
import
../../waku/common/databases/db_sqlite,
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_filter_v2/common,
../../waku/waku_store/common,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
./testlib/testutils,
./testlib/wakucore,
./testlib/wakunode
procSuite "Peer Manager":
asyncTest "connectRelay() works":
# Create 2 nodes
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
let connOk =
await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
await sleepAsync(chronos.milliseconds(500))
check:
connOk == true
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[1].peerInfo.peerId
)
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) ==
Connectedness.Connected
asyncTest "dialPeer() works":
# Create 2 nodes
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec
)
await sleepAsync(chronos.milliseconds(500))
# Check connection
check:
conn.isSome()
conn.get.activity
conn.get.peerId == nodes[1].peerInfo.peerId
# Check that node2 is being managed in node1
check:
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[1].peerInfo.peerId
)
# Check connectedness
check:
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) ==
Connectedness.Connected
await allFutures(nodes.mapIt(it.stop()))
asyncTest "dialPeer() fails gracefully":
# Create 2 nodes and start them
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
let nonExistentPeerRes = parsePeerInfo(
"/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmQSMNExfUYUqfuXWkD5DaNZnMYnigRxFKbk3tcEFQeQeE"
)
require nonExistentPeerRes.isOk()
let nonExistentPeer = nonExistentPeerRes.value
# Dial non-existent peer from node1
let conn1 =
await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec)
check:
conn1.isNone()
# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuStoreCodec
)
check:
conn2.isNone()
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Adding, selecting and filtering peers work":
let
node =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterPeer = PeerInfo.new(generateEcdsaKey(), @[filterLoc])
# Create store peer
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
storePeer = PeerInfo.new(generateEcdsaKey(), @[storeLoc])
await node.start()
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(
filterPeer.toRemotePeerInfo(), WakuFilterSubscribeCodec
)
# Check peers were successfully added to peer manager
check:
2023-03-31 15:21:40 +00:00
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).allIt(
it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterSubscribeCodec)
)
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(
it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec)
)
await node.stop()
asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Test default connectedness for new peers
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
check:
# No information about node2's connectedness
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) ==
NotConnected
# Failed connection
let nonExistentPeerRes = parsePeerInfo(
"/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmQSMNExfUYUqfuXWkD5DaNZnMYnigRxFKbk3tcEFQeQeE"
)
require:
nonExistentPeerRes.isOk()
let nonExistentPeer = nonExistentPeerRes.value
require:
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
await sleepAsync(chronos.milliseconds(500))
check:
# Cannot connect to node2
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) ==
CannotConnect
# Successful connection
require:
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) ==
true
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
# Stop node. Gracefully disconnect from all peers.
await nodes[0].stop()
check:
# Not currently connected to node2, but had recent, successful connection.
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) ==
CanConnect
await nodes[1].stop()
asyncTest "Peer manager updates failed peers correctly":
# Create 2 nodes
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
let nonExistentPeerRes = parsePeerInfo(
"/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmQSMNExfUYUqfuXWkD5DaNZnMYnigRxFKbk3tcEFQeQeE"
)
require nonExistentPeerRes.isOk()
let nonExistentPeer = nonExistentPeerRes.value
nodes[0].peerManager.addPeer(nonExistentPeer)
# Set a low backoff to speed up test: 2, 4, 8, 16
nodes[0].peerManager.initialBackoffInSec = 2
nodes[0].peerManager.backoffFactor = 2
# try to connect to peer that doesnt exist
let conn1Ok = await nodes[0].peerManager.connectRelay(nonExistentPeer)
check:
# Cannot connect to node2
nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) ==
CannotConnect
nodes[0].peerManager.peerStore[ConnectionBook][nonExistentPeer.peerId] ==
CannotConnect
nodes[0].peerManager.peerStore[NumberFailedConnBook][nonExistentPeer.peerId] == 1
# Connection attempt failed
conn1Ok == false
# Right after failing there is a backoff period
nodes[0].peerManager.canBeConnected(nonExistentPeer.peerId) == false
# We wait the first backoff period
await sleepAsync(chronos.milliseconds(2100))
# And backoff period is over
check:
nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true
# After a successful connection, the number of failed connections is reset
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
let conn2Ok =
await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
check:
conn2Ok == true
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
)
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"
check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] ==
remotePeerInfo2.addrs
# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage,
)
node3.mountMetadata(0).expect("Mounted Waku Metadata")
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()
await sleepAsync(chronos.milliseconds(500))
check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Sharded peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
)
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
await node1.start()
await node2.start()
await node1.mountRelay()
await node2.mountRelay()
let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"
check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] ==
remotePeerInfo2.addrs
# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage,
)
node3.mountMetadata(0).expect("Mounted Waku Metadata")
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay()
await node3.peerManager.manageRelayPeers()
await sleepAsync(chronos.milliseconds(500))
check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager drops conections to peers on different networks":
let clusterId3 = 3.uint32
let clusterId4 = 4.uint32
let
# different network
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId3,
pubsubTopics = @["/waku/2/rs/3/0"],
)
# same network
node2 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
pubsubTopics = @["/waku/2/rs/4/0"],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
pubsubTopics = @["/waku/2/rs/4/0"],
)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
# 1->2 (fails)
let conn1 = await node1.peerManager.dialPeer(
node2.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
# 1->3 (fails)
let conn2 = await node1.peerManager.dialPeer(
node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
# 2->3 (succeeds)
let conn3 = await node2.peerManager.dialPeer(
node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
check:
conn1.isNone
conn2.isNone
conn3.isSome
# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
peerStorage = storage,
)
node2 =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
2022-01-10 15:07:35 +00:00
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
await node1.start()
await node2.start()
await node1.mountRelay()
node1.wakuRelay.codec = betaCodec
await node2.mountRelay()
node2.wakuRelay.codec = betaCodec
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.peers().anyIt(
it.protocols.contains(node2.wakuRelay.codec)
)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
peerStorage = storage,
)
await node3.mountRelay()
node3.wakuRelay.codec = stableCodec
check:
# Node 2 and 3 have differing codecs
node2.wakuRelay.codec == betaCodec
node3.wakuRelay.codec == stableCodec
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.start() # This should trigger a reconnect
check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0 ..< 4).mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay])),
)
)
# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
# Get all peer infos
let peerInfos = collect:
for i in 0 .. nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])
# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[1].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[2].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[3].switch.peerInfo.peerId
)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] ==
Connected
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Sharded peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0 ..< 4).mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay])),
)
)
# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
# Get all peer infos
let peerInfos = collect:
for i in 0 .. nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])
# Connect to relay peers
await nodes[0].peerManager.manageRelayPeers()
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[1].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[2].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[3].switch.peerInfo.peerId
)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] ==
Connected
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
let nodes = toSeq(0 ..< 4).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
# Start them
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# all nodes connect to peer 0
require:
(await nodes[1].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true
await sleepAsync(chronos.milliseconds(500))
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# Inbound/Outbound number of peers match
nodes[0].peerManager.peerStore.getPeersByDirection(Inbound).len == 3
nodes[0].peerManager.peerStore.getPeersByDirection(Outbound).len == 0
nodes[1].peerManager.peerStore.getPeersByDirection(Inbound).len == 0
nodes[1].peerManager.peerStore.getPeersByDirection(Outbound).len == 1
nodes[2].peerManager.peerStore.getPeersByDirection(Inbound).len == 0
nodes[2].peerManager.peerStore.getPeersByDirection(Outbound).len == 1
nodes[3].peerManager.peerStore.getPeersByDirection(Inbound).len == 0
nodes[3].peerManager.peerStore.getPeersByDirection(Outbound).len == 1
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[1].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[2].switch.peerInfo.peerId
)
nodes[0].peerManager.peerStore.peers().anyIt(
it.peerId == nodes[3].switch.peerInfo.peerId
)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(
WakuRelayCodec
)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] ==
Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] ==
Connected
# All peers are Inbound in peer 0
nodes[0].peerManager.peerStore[DirectionBook][nodes[1].switch.peerInfo.peerId] ==
Inbound
nodes[0].peerManager.peerStore[DirectionBook][nodes[2].switch.peerInfo.peerId] ==
Inbound
nodes[0].peerManager.peerStore[DirectionBook][nodes[3].switch.peerInfo.peerId] ==
Inbound
# All peers have an Outbound connection with peer 0
nodes[1].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] ==
Outbound
nodes[2].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] ==
Outbound
nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] ==
Outbound
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer store addServicePeer() stores service peers":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
let
node =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peers = toSeq(1 .. 4)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk())
.mapIt(it.value)
require:
peers.len == 4
# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec)
# relay peers (should not be added)
node.peerManager.addServicePeer(peers[3], WakuRelayCodec)
# all peers are stored in the peerstore
check:
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId)
# but the relay peer is not
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == false
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId
# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
asyncTest "connectedPeers() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0 ..< 4).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# create some connections/streams
check:
# some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterSubscribeCodec)).isSome() ==
true
# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterSubscribeCodec)).isSome() ==
true
# assert physical connections
check:
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 2
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0
asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0 ..< 2).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (0, 4)
nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (4, 0)
test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise().build(),
storage = nil,
)
# Create 3 peer infos
let peers = toSeq(1 .. 3)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk())
.mapIt(it.value)
require:
peers.len == 3
# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] =
@[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec]
# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer1.isSome() == true
selectedPeer1.get().peerId == peers[0].peerId
# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer3.isSome() == false
# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLightPushCodec)
# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId
let selectedPeer5 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId
test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections
let peerStoreSize = 5
let maxConnections = 10
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(peerStoreSize)
.withMaxConnections(maxConnections)
.build(),
storage = nil,
)
test "prunePeerStore() correctly removes peers to match max quota":
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = some(5),
storage = nil,
)
# Create 15 peers and add them to the peerstore
let peers = toSeq(1 .. 15)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
.filterIt(it.isOk())
.mapIt(it.value)
for p in peers:
pm.addPeer(p)
# Check that we have 15 peers in the peerstore
check:
pm.peerStore.peers.len == 15
# fake that some peers failed to connected
pm.peerStore[NumberFailedConnBook][peers[0].peerId] = 2
pm.peerStore[NumberFailedConnBook][peers[1].peerId] = 2
pm.peerStore[NumberFailedConnBook][peers[2].peerId] = 2
# fake that some peers are connected
pm.peerStore[ConnectionBook][peers[5].peerId] = Connected
pm.peerStore[ConnectionBook][peers[8].peerId] = Connected
pm.peerStore[ConnectionBook][peers[10].peerId] = Connected
pm.peerStore[ConnectionBook][peers[12].peerId] = Connected
# Prune the peerstore (current=15, target=5)
pm.prunePeerStore()
check:
# ensure peerstore was pruned
pm.peerStore.peers.len == 10
# ensure connected peers were not pruned
pm.peerStore.peers.anyIt(it.peerId == peers[5].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[8].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[10].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[12].peerId)
# ensure peers that failed were the first to be pruned
not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId)
asyncTest "canBeConnected() returns correct value":
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
initialBackoffInSec = 1,
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
# new peer with no errors can be connected
check:
pm.canBeConnected(p1) == true
# peer with ONE error that just failed
pm.peerStore[NumberFailedConnBook][p1] = 1
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# we cant connect right now
check:
pm.canBeConnected(p1) == false
# but we can after the first backoff of 1 seconds
await sleepAsync(chronos.milliseconds(1200))
check:
pm.canBeConnected(p1) == true
# peer with TWO errors, we can connect until 2 seconds have passed
pm.peerStore[NumberFailedConnBook][p1] = 2
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# cant be connected after 1 second
await sleepAsync(chronos.milliseconds(1000))
check:
pm.canBeConnected(p1) == false
# can be connected after 2 seconds
await sleepAsync(chronos.milliseconds(1200))
check:
pm.canBeConnected(p1) == true
# can't be connected if failed attempts are equal to maxFailedAttempts
pm.maxFailedAttempts = 2
check:
pm.canBeConnected(p1) == false
test "peer manager must fail if max backoff is over a week":
# Should result in overflow exception
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil,
)
# Should result in backoff > 1 week
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
let pm = PeerManager.new(
switch = SwitchBuilder
.new()
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = some(5),
storage = nil,
)
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0 ..< 5).mapIt(
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
)
# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])
await sleepAsync(chronos.milliseconds(500))
# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
await sleepAsync(chronos.milliseconds(500))
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
# we should have 4 peers (2in/2out) but due to collocation limit
# they are pruned to max 1
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1
await allFutures(nodes.mapIt(it.stop()))