From 33774fada0a2be2d960900590186810bcf8e2d52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Tue, 19 Mar 2024 16:18:52 +0100 Subject: [PATCH] fix(2491): Fix metadata protocol disconnecting light nodes (#2533) * Fix metadata protocol disconnecting light nodes. * Implement test cases. --- tests/node/peer_manager/test_peer_manager.nim | 127 ++++++++++++++++++ waku/node/peer_manager/peer_manager.nim | 9 +- waku/node/waku_node.nim | 2 + 3 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 tests/node/peer_manager/test_peer_manager.nim diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim new file mode 100644 index 000000000..f9e8f9a29 --- /dev/null +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -0,0 +1,127 @@ +import + chronicles, + std/[options, tables, strutils], + stew/shims/net, + chronos, + testutils/unittests + +import + ../../../waku/[node/waku_node, waku_core], + ../../waku_lightpush/[lightpush_utils], + ../../testlib/[wakucore, wakunode, futures, testasync], + ../../../../waku/node/peer_manager/peer_manager + +suite "Peer Manager": + suite "onPeerMetadata": + var + listenPort {.threadvar.}: Port + listenAddress {.threadvar.}: IpAddress + serverKey {.threadvar.}: PrivateKey + clientKey {.threadvar.}: PrivateKey + clusterId {.threadvar.}: uint64 + shardTopic0 {.threadvar.}: string + shardTopic1 {.threadvar.}: string + + asyncSetup: + listenPort = Port(0) + listenAddress = ValidIpAddress.init("0.0.0.0") + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + clusterId = 1 + shardTopic0 = "/waku/2/rs/" & $clusterId & "/0" + shardTopic1 = "/waku/2/rs/" & $clusterId & "/1" + + asyncTest "light client is not disconnected": + # Given two nodes with the same shardId + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1]) + + # And both mount metadata and filter + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountFilterClient() + await server.mountFilter() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + discard await client.filterSubscribe( + some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo + ) + await sleepAsync(FUTURE_TIMEOUT) + + check: + server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) + + asyncTest "relay with same shardId is not disconnected": + # Given two nodes with the same shardId + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic0]) + + # And both mount metadata and relay + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountRelay() + await server.mountRelay() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + await sleepAsync(FUTURE_TIMEOUT) + + check: + server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) + + asyncTest "relay with different shardId is disconnected": + # Given two nodes with different shardIds + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1]) + + # And both mount metadata and relay + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountRelay() + await server.mountRelay() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + await sleepAsync(FUTURE_TIMEOUT) + + check: + not server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + not client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 65df13697..c2e245a0e 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -10,7 +10,9 @@ import metrics, libp2p/multistream, libp2p/muxers/muxer, - libp2p/nameresolving/nameresolver + libp2p/nameresolving/nameresolver, + libp2p/peerstore + import ../../common/nimchronos, ../../common/enr, @@ -369,7 +371,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = $clusterId break guardClauses - if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)): + if ( + pm.peerStore.hasPeer(peerId, WakuRelayCodec) and + not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) + ): reason = "no shards in common" break guardClauses diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index dbd985dda..434bd13b5 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -605,6 +605,8 @@ proc filterSubscribe*( contentTopics = contentTopics, peer = remotePeer.peerId + when (contentTopics is ContentTopic): + let contentTopics = @[contentTopics] let subRes = await node.wakuFilterClient.subscribe( remotePeer, pubsubTopic.get(), contentTopics )