diff --git a/tests/node/peer_manager/peer_store/utils.nim b/tests/node/peer_manager/peer_store/utils.nim new file mode 100644 index 000000000..8c631b508 --- /dev/null +++ b/tests/node/peer_manager/peer_store/utils.nim @@ -0,0 +1,12 @@ +import std/options, stew/results, libp2p/peerstore + +import + ../../../../waku/node/peer_manager/[waku_peer_store, peer_store/waku_peer_storage], + ../../../waku_archive/archive_utils + +proc newTestWakuPeerStorage*(path: Option[string] = string.none()): WakuPeerStorage = + let db = newSqliteDatabase(path) + WakuPeerStorage.new(db).value() + +proc peerExists*(peerStore: PeerStore, peerId: PeerId): bool = + return peerStore[AddressBook].contains(peerId) diff --git a/tests/node/test_all.nim b/tests/node/test_all.nim index 308550c88..6c3d76175 100644 --- a/tests/node/test_all.nim +++ b/tests/node/test_all.nim @@ -3,4 +3,5 @@ import ./test_wakunode_lightpush, ./test_wakunode_peer_exchange, ./test_wakunode_store, - ./test_wakunode_legacy_store + ./test_wakunode_legacy_store, + ./test_wakunode_peer_manager diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim new file mode 100644 index 000000000..c4c930797 --- /dev/null +++ b/tests/node/test_wakunode_peer_manager.nim @@ -0,0 +1,957 @@ +{.used.} + +import + os, + std/[options, tables], + stew/shims/net as stewNet, + testutils/unittests, + chronos, + # chronos/timer, + chronicles, + libp2p/[peerstore, crypto/crypto, multiaddress] + +from times import getTime, toUnix + +import + ../../../waku/[ + waku_core, + node/peer_manager, + node/waku_node, + waku_enr/sharding, + discovery/waku_discv5, + waku_filter_v2/common, + waku_relay/protocol, + ], + ../testlib/ + [wakucore, wakunode, testasync, testutils, assertions, comparisons, futures], + ../waku_enr/utils, + ../waku_archive/archive_utils, + ../waku_discv5/utils, + ./peer_manager/peer_store/utils, + ./utils + +const DEFAULT_PROTOCOLS: seq[string] = + @["/ipfs/id/1.0.0", "/libp2p/autonat/1.0.0", "/libp2p/circuit/relay/0.2.0/hop"] + +let + listenIp = ValidIpAddress.init("0.0.0.0") + listenPort = Port(0) + +suite "Peer Manager": + var + pubsubTopic {.threadvar.}: PubsubTopic + contentTopic {.threadvar.}: ContentTopic + + var + server {.threadvar.}: WakuNode + serverPeerStore {.threadvar.}: PeerStore + client {.threadvar.}: WakuNode + clientPeerStore {.threadvar.}: PeerStore + + var + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + serverPeerId {.threadvar.}: PeerId + clientRemotePeerInfo {.threadvar.}: RemotePeerInfo + clientPeerId {.threadvar.}: PeerId + + asyncSetup: + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, listenIp, listenPort) + serverPeerStore = server.peerManager.peerStore + client = newTestWakuNode(clientKey, listenIp, listenPort) + clientPeerStore = client.peerManager.peerStore + + await allFutures(server.start(), client.start()) + + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + clientRemotePeerInfo = client.switch.peerInfo.toRemotePeerInfo() + clientPeerId = clientRemotePeerInfo.peerId + + asyncTeardown: + await allFutures(server.stop(), client.stop()) + + suite "Peer Connectivity, Management, and Store": + asyncTest "Peer Connection Validation": + # When a client connects to a server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the server should have the client in its peer store + check: + clientPeerStore.peerExists(serverRemotePeerInfo.peerId) + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + asyncTest "Graceful Handling of Non-Existent Peers": + # Given a non existent RemotePeerInfo + let + privKey = generateSecp256k1Key() + extIp = "127.0.0.1" + tcpPort = 61500u16 + udpPort = 9000u16 + nonExistentRecord = newTestEnrRecord( + privKey = privKey, extIp = extIp, tcpPort = tcpPort, udpPort = udpPort + ) + nonExistentRemotePeerInfo = nonExistentRecord.toRemotePeerInfo().value() + + # When a client connects to the non existent peer + await client.connectToNodes(@[nonExistentRemotePeerInfo]) + + # Then the client exists in the peer store but is marked as a failed connection + let parsedRemotePeerInfo = clientPeerStore.get(nonExistentRemotePeerInfo.peerId) + check: + clientPeerStore.peerExists(nonExistentRemotePeerInfo.peerId) + parsedRemotePeerInfo.connectedness == CannotConnect + parsedRemotePeerInfo.lastFailedConn <= Moment.init(getTime().toUnix, Second) + parsedRemotePeerInfo.numberFailedConn == 1 + + suite "Peer Store Pruning": + asyncTest "Capacity is not exceeded": + # Given the client's peer store has a capacity of 1 + clientPeerStore.capacity = 1 + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then no peers are removed + check: + clientPeerStore.peers().len == 1 + + asyncTest "Capacity is not exceeded but some peers are unhealthy": + # Given the client's peer store has a capacity of 1 + clientPeerStore.capacity = 1 + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # Given the server is marked as CannotConnect + client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then no peers are removed + check: + clientPeerStore.peers().len == 1 + + asyncTest "Capacity is exceeded but all peers are healthy": + # Given the client's peer store has a capacity of 0 + clientPeerStore.capacity = 0 + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then no peers are removed + check: + clientPeerStore.peers().len == 1 + + asyncTest "Failed connections": + # Given the client's peer store has a capacity of 0 and maxFailedAttempts of 1 + clientPeerStore.capacity = 0 + client.peerManager.maxFailedAttempts = 1 + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # Given the server is marked as having 1 failed connection + client.peerManager.peerStore[NumberFailedConnBook].book[serverPeerId] = 1 + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then the server is removed from the client's peer store + check: + clientPeerStore.peers().len == 0 + + asyncTest "Shardless": + # Given the client's peer store has a capacity of 0 + clientPeerStore.capacity = 0 + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # Given the server is marked as not connected + client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then the server is removed from the client's peer store + check: + clientPeerStore.peers().len == 0 + + asyncTest "Higher than avg shard count": + # Given the client's peer store has a capacity of 0 + clientPeerStore.capacity = 0 + + # And the server's remote peer info contains the node's ENR + serverRemotePeerInfo.enr = some(server.enr) + + # And the client connects to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + check: + clientPeerStore.peers().len == 1 + + # Given the server is marked as not connected + # (There's only one shard in the ENR so avg shards will be the same as the shard count; hence it will be purged.) + client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + + # When pruning the client's store + client.peerManager.prunePeerStore() + + # Then the server is removed from the client's peer store + check: + clientPeerStore.peers().len == 0 + + suite "Enforcing Colocation Limits": + asyncTest "Without colocation limits": + # Given two extra clients + let + client2Key = generateSecp256k1Key() + client3Key = generateSecp256k1Key() + client2 = newTestWakuNode(client2Key, listenIp, listenPort) + client3 = newTestWakuNode(client3Key, listenIp, listenPort) + + await allFutures(client2.start(), client3.start()) + + # And the server's peer manager has no colocation limit + server.peerManager.colocationLimit = 0 + + # When all clients connect to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + await client2.connectToNodes(@[serverRemotePeerInfo]) + await client3.connectToNodes(@[serverRemotePeerInfo]) + + # Then the server should have all clients in its peer store + check: + serverPeerStore.peers().len == 3 + + # Teardown + await allFutures(client2.stop(), client3.stop()) + + asyncTest "With colocation limits": + # Given two extra clients + let + client2Key = generateSecp256k1Key() + client3Key = generateSecp256k1Key() + client2 = newTestWakuNode(client2Key, listenIp, listenPort) + client3 = newTestWakuNode(client3Key, listenIp, listenPort) + + await allFutures(client2.start(), client3.start()) + + # And the server's peer manager has a colocation limit of 1 + server.peerManager.colocationLimit = 1 + + # When all clients connect to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + await client2.connectToNodes(@[serverRemotePeerInfo]) + await client3.connectToNodes(@[serverRemotePeerInfo]) + + # Then the server should have only 1 client in its peer store + check: + serverPeerStore.peers().len == 1 + + # Teardown + await allFutures(client2.stop(), client3.stop()) + + suite "In-memory Data Structure Verification": + asyncTest "Cannot add self": + # When trying to add self to the peer store + client.peerManager.addPeer(clientRemotePeerInfo) + + # Then the peer store should not contain the peer + check: + not clientPeerStore.peerExists(clientPeerId) + + asyncTest "Peer stored in peer store": + # When adding a peer other than self to the peer store + client.peerManager.addPeer(serverRemotePeerInfo) + + # Then the peer store should contain the peer + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore[AddressBook][serverPeerId] == serverRemotePeerInfo.addrs + + suite "Protocol-Specific Peer Handling": + asyncTest "Peer Protocol Support Verification - No waku protocols": + # When connecting to a server with no Waku protocols + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the stored protocols should be the default (libp2p) ones + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + + asyncTest "Peer Protocol Support Verification (Before Connection)": + # Given the server has mounted some Waku protocols + await server.mountRelay() + await server.mountFilter() + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the stored protocols should include the Waku protocols + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec, WakuFilterSubscribeCodec] + + asyncTest "Service-Specific Peer Addition": + # Given a server mounts some Waku protocols + await server.mountFilter() + + # And another server that mounts different Waku protocols + let + server2Key = generateSecp256k1Key() + server2 = newTestWakuNode(server2Key, listenIp, listenPort) + + await server2.start() + + let + server2RemotePeerInfo = server2.switch.peerInfo.toRemotePeerInfo() + server2PeerId = server2RemotePeerInfo.peerId + + await server2.mountRelay() + + # When connecting to both servers + await client.connectToNodes(@[serverRemotePeerInfo, server2RemotePeerInfo]) + + # Then the peer store should contain both peers with the correct protocols + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuFilterSubscribeCodec] + clientPeerStore.peerExists(server2PeerId) + clientPeerStore.get(server2PeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec] + + # Cleanup + await server2.stop() + + suite "Tracked Peer Metadata": + xasyncTest "Metadata Recording": + # When adding a peer other than self to the peer store + serverRemotePeerInfo.enr = some(server.enr) + client.peerManager.addPeer(serverRemotePeerInfo) + + # Then the peer store should contain the peer + check clientPeerStore.peerExists(serverPeerId) + + # And all the peer's information should be stored + check: + clientPeerStore[AddressBook][serverPeerId] == serverRemotePeerInfo.addrs + clientPeerStore[ENRBook][serverPeerId].raw == + serverRemotePeerInfo.enr.get().raw + chainedComparison( + clientPeerStore[ProtoBook][serverPeerId], + serverRemotePeerInfo.protocols, + DEFAULT_PROTOCOLS, + ) + chainedComparison( + clientPeerStore[AgentBook][serverPeerId], # FIXME: Not assigned + serverRemotePeerInfo.agent, + "nim-libp2p/0.0.1", + ) + chainedComparison( + clientPeerStore[ProtoVersionBook][serverPeerId], # FIXME: Not assigned + serverRemotePeerInfo.protoVersion, + "ipfs/0.1.0", + ) + clientPeerStore[KeyBook][serverPeerId] == serverRemotePeerInfo.publicKey + chainedComparison( + clientPeerStore[ConnectionBook][serverPeerId], + serverRemotePeerInfo.connectedness, + NOT_CONNECTED, + ) + chainedComparison( + clientPeerStore[DisconnectBook][serverPeerId], + serverRemotePeerInfo.disconnectTime, + 0, + ) + chainedComparison( + clientPeerStore[SourceBook][serverPeerId], + serverRemotePeerInfo.origin, + UnknownOrigin, + ) + chainedComparison( + clientPeerStore[DirectionBook][serverPeerId], + serverRemotePeerInfo.direction, + UnknownDirection, + ) + chainedComparison( + clientPeerStore[LastFailedConnBook][serverPeerId], + serverRemotePeerInfo.lastFailedConn, + Moment.init(0, Second), + ) + chainedComparison( + clientPeerStore[NumberFailedConnBook][serverPeerId], + serverRemotePeerInfo.numberFailedConn, + 0, + ) + + xasyncTest "Metadata Accuracy": + # Given a second server + let + server2Key = generateSecp256k1Key() + server2 = newTestWakuNode(server2Key, listenIp, listenPort) + server2RemotePeerInfo = server2.switch.peerInfo.toRemotePeerInfo() + server2PeerId = server2RemotePeerInfo.peerId + + await server2.start() + + # When the client connects to both servers + await client.connectToNodes(@[serverRemotePeerInfo, server2RemotePeerInfo]) + + # Then the peer store should contain both peers with the correct metadata + check: + # Server + clientPeerStore[AddressBook][serverPeerId] == serverRemotePeerInfo.addrs + clientPeerStore[ENRBook][serverPeerId].raw == + serverRemotePeerInfo.enr.get().raw + chainedComparison( + clientPeerStore[ProtoBook][serverPeerId], + serverRemotePeerInfo.protocols, + DEFAULT_PROTOCOLS, + ) + chainedComparison( + clientPeerStore[AgentBook][serverPeerId], # FIXME: Not assigned + serverRemotePeerInfo.agent, + "nim-libp2p/0.0.1", + ) + chainedComparison( + clientPeerStore[ProtoVersionBook][serverPeerId], # FIXME: Not assigned + serverRemotePeerInfo.protoVersion, + "ipfs/0.1.0", + ) + clientPeerStore[KeyBook][serverPeerId] == serverRemotePeerInfo.publicKey + chainedComparison( + clientPeerStore[ConnectionBook][serverPeerId], + serverRemotePeerInfo.connectedness, + NOT_CONNECTED, + ) + chainedComparison( + clientPeerStore[DisconnectBook][serverPeerId], + serverRemotePeerInfo.disconnectTime, + 0, + ) + chainedComparison( + clientPeerStore[SourceBook][serverPeerId], + serverRemotePeerInfo.origin, + UnknownOrigin, + ) + chainedComparison( + clientPeerStore[DirectionBook][serverPeerId], + serverRemotePeerInfo.direction, + UnknownDirection, + ) + chainedComparison( + clientPeerStore[LastFailedConnBook][serverPeerId], + serverRemotePeerInfo.lastFailedConn, + Moment.init(0, Second), + ) + chainedComparison( + clientPeerStore[NumberFailedConnBook][serverPeerId], + serverRemotePeerInfo.numberFailedConn, + 0, + ) + + # Server 2 + clientPeerStore[AddressBook][server2PeerId] == server2RemotePeerInfo.addrs + clientPeerStore[ENRBook][server2PeerId].raw == + server2RemotePeerInfo.enr.get().raw + chainedComparison( + clientPeerStore[ProtoBook][server2PeerId], + server2RemotePeerInfo.protocols, + DEFAULT_PROTOCOLS, + ) + chainedComparison( + clientPeerStore[AgentBook][server2PeerId], # FIXME: Not assigned + server2RemotePeerInfo.agent, + "nim-libp2p/0.0.1", + ) + chainedComparison( + clientPeerStore[ProtoVersionBook][server2PeerId], # FIXME: Not assigned + server2RemotePeerInfo.protoVersion, + "ipfs/0.1.0", + ) + clientPeerStore[KeyBook][serverPeerId] == server2RemotePeerInfo.publicKey + chainedComparison( + clientPeerStore[ConnectionBook][server2PeerId], + server2RemotePeerInfo.connectedness, + NOT_CONNECTED, + ) + chainedComparison( + clientPeerStore[DisconnectBook][server2PeerId], + server2RemotePeerInfo.disconnectTime, + 0, + ) + chainedComparison( + clientPeerStore[SourceBook][server2PeerId], + server2RemotePeerInfo.origin, + UnknownOrigin, + ) + chainedComparison( + clientPeerStore[DirectionBook][server2PeerId], + server2RemotePeerInfo.direction, + UnknownDirection, + ) + chainedComparison( + clientPeerStore[LastFailedConnBook][server2PeerId], + server2RemotePeerInfo.lastFailedConn, + Moment.init(0, Second), + ) + chainedComparison( + clientPeerStore[NumberFailedConnBook][server2PeerId], + server2RemotePeerInfo.numberFailedConn, + 0, + ) + + suite "Peer Connectivity States": + asyncTest "State Tracking & Transition": + # Given two correctly initialised nodes, but not connected + await server.mountRelay() + await client.mountRelay() + + # Then their connectedness should be NotConnected + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.NotConnected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.NotConnected + + # When connecting the client to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then both peers' connectedness should be Connected + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + # When stopping the switches of either of the peers + # (Running just one stop is enough to change the states in both peers, but I'll leave both calls as an example) + await server.switch.stop() + await client.switch.stop() + + # Then both peers are gracefully disconnected, and turned to CanConnect + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + # When trying to connect those peers to a non-existent peer + # Generate an invalid multiaddress, and patching both peerInfos with it so dialing fails + let + port = Port(8080) + ipAddress = IpAddress(family: IPv4, address_v4: [192, 168, 0, 1]) + multiaddress = + MultiAddress.init(ipAddress, IpTransportProtocol.tcpProtocol, port) + serverRemotePeerInfo.addrs = @[multiaddress] + clientRemotePeerInfo.addrs = @[multiaddress] + await client.connectToNodes(@[serverRemotePeerInfo]) + await server.connectToNodes(@[clientRemotePeerInfo]) + + # Then both peers should be marked as CannotConnect + check: + clientPeerStore.get(serverPeerId).connectedness == + Connectedness.CannotConnect + serverPeerStore.get(clientPeerId).connectedness == + Connectedness.CannotConnect + + suite "Automatic Reconnection": + xasyncTest "Automatic Reconnection Implementation": + # Given two correctly initialised nodes, that are available for reconnection + await server.mountRelay() + await client.mountRelay() + await client.connectToNodes(@[serverRemotePeerInfo]) + await server.switch.stop() + await client.switch.stop() + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + # When triggering the reconnection + await client.peerManager.reconnectPeers(WakuRelayCodec) + + # Then both peers should be marked as Connected + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + xasyncTest "Automatic Reconnection Implementation (With Backoff)": + # Given two correctly initialised nodes, that are available for reconnection + await server.mountRelay() + await client.mountRelay() + await client.connectToNodes(@[serverRemotePeerInfo]) + waitFor allFutures(server.switch.stop(), client.switch.stop()) + waitFor allFutures(server.switch.start(), client.switch.start()) + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + # When triggering a reconnection with a backoff period + let + backoffPeriod = 10.seconds + halfBackoffPeriod = 5.seconds + + await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod) + await sleepAsync(halfBackoffPeriod) + + # If the backoff period is not over, then the peers should still be marked as CanConnect + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + # When waiting for the backoff period to be over + await sleepAsync(halfBackoffPeriod) + + # Then both peers should be marked as Connected + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + + xasyncTest "Automatic Reconnection Implementation (After client restart)": + # Given two correctly initialised nodes, that are available for reconnection + await server.mountRelay() + await client.mountRelay() + await client.connectToNodes(@[serverRemotePeerInfo]) + await server.switch.stop() + await client.switch.stop() + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect + serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + + # When triggering the reconnection, and some time for the reconnection to happen + waitFor allFutures(client.stop(), server.stop()) + await allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT_LONG) + + # Then both peers should be marked as Connected + check: + clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + +suite "Handling Connections on Different Networks": + # TODO: Implement after discv5 and peer manager's interaction is understood + proc buildNode( + tcpPort: uint16, + udpPort: uint16, + bindIp: string = "0.0.0.0", + extIp: string = "127.0.0.1", + indices: seq[uint64] = @[], + recordFlags: Option[CapabilitiesBitfield] = none(CapabilitiesBitfield), + bootstrapRecords: seq[waku_enr.Record] = @[], + ): (WakuDiscoveryV5, Record) = + let + privKey = generateSecp256k1Key() + record = newTestEnrRecord( + privKey = privKey, + extIp = extIp, + tcpPort = tcpPort, + udpPort = udpPort, + indices = indices, + flags = recordFlags, + ) + node = newTestDiscv5( + privKey = privKey, + bindIp = bindIp, + tcpPort = tcpPort, + udpPort = udpPort, + record = record, + bootstrapRecords = bootstrapRecords, + ) + + (node, record) + + asyncTest "Same cluster but different shard": + # peer 1 is on cluster x - shard a ; peer 2 is on cluster x - shard b + # todo: Implement after discv5 and peer manager's interaction is understood + discard + + xasyncTest "Different cluster but same shard": + # peer 1 is on cluster x - shard a ; peer 2 is on cluster y - shard a + # todo: Implement after discv5 and peer manager's interaction is understood + discard + + xasyncTest "Different cluster and different shard": + # peer 1 is on cluster x - shard a ; peer 2 is on cluster y - shard b + # todo: Implement after discv5 and peer manager's interaction is understood + discard + + xasyncTest "Same cluster with multiple shards (one shared)": + # peer 1 is on cluster x - shard [a,b,c] ; peer 2 is on cluster x - shard [c, d, e] + # todo: Implement after discv5 and peer manager's interaction is understood + discard + +const baseDbPath = "./peers.test.db" +proc cleanupDb() = + os.removeFile(baseDbPath) + os.removeFile(baseDbPath & "-shm") + os.removeFile(baseDbPath & "-wal") + +suite "Persistence Check": + asyncTest "PeerStorage exists": + # Cleanup previous existing db + cleanupDb() + + # Given an on-disk peer db exists, with a peer in it; and two connected nodes + let + clientPeerStorage = newTestWakuPeerStorage(some(baseDbPath)) + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + client = newTestWakuNode( + clientKey, listenIp, listenPort, peerStorage = clientPeerStorage + ) + serverPeerStore = server.peerManager.peerStore + clientPeerStore = client.peerManager.peerStore + + await allFutures(server.start(), client.start()) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + check: + clientPeerStore.peers().len == 1 + + await allFutures(server.stop(), client.stop()) + + # When initializing a new client using the prepopulated on-disk storage + let + newClientPeerStorage = newTestWakuPeerStorage(some(baseDbPath)) + newClient = newTestWakuNode( + clientKey, listenIp, listenPort, peerStorage = newClientPeerStorage + ) + newClientPeerStore = newClient.peerManager.peerStore + + await newClient.start() + + # Then the new client should have the same peer in its peer store + check: + newClientPeerStore.peers().len == 1 + + # Cleanup + await newClient.stop() + cleanupDb() + + asyncTest "PeerStorage exists but no data": + # Cleanup previous existing db + cleanupDb() + + # When creating a new server with memory storage, and a client with on-disk peer storage + let + clientPeerStorage = newTestWakuPeerStorage(some(baseDbPath)) + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + client = newTestWakuNode( + clientKey, listenIp, listenPort, peerStorage = clientPeerStorage + ) + serverPeerStore = server.peerManager.peerStore + clientPeerStore = client.peerManager.peerStore + + await allFutures(server.start(), client.start()) + + # Then the client's peer store should be empty + check: + clientPeerStore.peers().len == 0 + + # Cleanup + await allFutures(server.stop(), client.stop()) + cleanupDb() + + asyncTest "PeerStorage not exists": + # When creating a new server and client, both without peer storage + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + client = newTestWakuNode(clientKey, listenIp, listenPort) + serverPeerStore = server.peerManager.peerStore + clientPeerStore = client.peerManager.peerStore + + await allFutures(server.start(), client.start()) + + # Then the client's peer store should be empty + check: + clientPeerStore.peers().len == 0 + + # Cleanup + await allFutures(server.stop(), client.stop()) + +suite "Mount Order": + var + client {.threadvar.}: WakuNode + clientRemotePeerInfo {.threadvar.}: RemotePeerInfo + clientPeerStore {.threadvar.}: PeerStore + + asyncSetup: + let clientKey = generateSecp256k1Key() + + client = newTestWakuNode(clientKey, listenIp, listenPort) + clientPeerStore = client.peerManager.peerStore + + await client.start() + + clientRemotePeerInfo = client.switch.peerInfo.toRemotePeerInfo() + + asyncTeardown: + await client.stop() + + asyncTest "protocol-start-info": + # Given a server that is initiaalised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + await server.mountRelay() + await server.start() + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer with the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec] + + # Cleanup + await server.stop() + + asyncTest "protocol-info-start": + # Given a server that is initialised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + await server.mountRelay() + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + await server.start() + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer with the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec] + + # Cleanup + await server.stop() + + asyncTest "start-protocol-info": + # Given a server that is initialised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + await server.start() + await server.mountRelay() + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer with the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec] + + # Cleanup + await server.stop() + + asyncTest "start-info-protocol": + # Given a server that is initialised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + await server.start() + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + await server.mountRelay() + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer with the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == + DEFAULT_PROTOCOLS & @[WakuRelayCodec] + + # Cleanup + await server.stop() + + asyncTest "info-start-protocol": + # Given a server that is initialised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + await server.start() + await server.mountRelay() + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer but not the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + + # Cleanup + await server.stop() + + asyncTest "info-protocol-start": + # Given a server that is initialised in the order defined in the title + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, listenIp, listenPort) + + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + serverPeerId = serverRemotePeerInfo.peerId + await server.mountRelay() + await server.start() + + # When connecting to the server + await client.connectToNodes(@[serverRemotePeerInfo]) + + # Then the peer store should contain the peer but not the mounted protocol + check: + clientPeerStore.peerExists(serverPeerId) + clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + + # Cleanup + await server.stop() diff --git a/tests/node/utils.nim b/tests/node/utils.nim new file mode 100644 index 000000000..eb0e6491d --- /dev/null +++ b/tests/node/utils.nim @@ -0,0 +1,8 @@ +import std/options, results +import + ../../../waku/ + [node/peer_manager, node/waku_node, waku_enr/sharding, common/enr/typed_record], + ../testlib/[wakucore] + +proc relayShards*(node: WakuNode): RelayShards = + return node.enr.toTyped().get().relayShardingIndicesList().get() diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4c4408614..cb033385d 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -105,8 +105,7 @@ procSuite "Peer Manager": let nonExistentPeer = nonExistentPeerRes.value # Dial non-existent peer from node1 - let conn1 = - await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec) + let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec) check: conn1.isNone() @@ -410,16 +409,13 @@ procSuite "Peer Manager": await allFutures([node1.stop(), node2.stop(), node3.stop()]) asyncTest "Peer manager drops conections to peers on different networks": - let clusterId3 = 3.uint32 - let clusterId4 = 4.uint32 - let + port = Port(0) # different network node1 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), - Port(0), - clusterId = clusterId3, + port, pubsubTopics = @["/waku/2/rs/3/0"], ) @@ -427,21 +423,19 @@ procSuite "Peer Manager": node2 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), - Port(0), - clusterId = clusterId4, + port, pubsubTopics = @["/waku/2/rs/4/0"], ) node3 = newTestWakuNode( generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), - Port(0), - clusterId = clusterId4, + port, pubsubTopics = @["/waku/2/rs/4/0"], ) - node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata") - node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata") - node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata") + node1.mountMetadata(3).expect("Mounted Waku Metadata") + node2.mountMetadata(4).expect("Mounted Waku Metadata") + node3.mountMetadata(4).expect("Mounted Waku Metadata") # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) diff --git a/tests/testlib/comparisons.nim b/tests/testlib/comparisons.nim new file mode 100644 index 000000000..cdc2404c2 --- /dev/null +++ b/tests/testlib/comparisons.nim @@ -0,0 +1,2 @@ +template chainedComparison*(a: untyped, b: untyped, c: untyped): bool = + a == b and b == c diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim index cfb6431e1..7dd5dfaa0 100644 --- a/tests/testlib/futures.nim +++ b/tests/testlib/futures.nim @@ -4,6 +4,7 @@ import ../../../waku/[waku_core/message, waku_store, waku_store_legacy] const FUTURE_TIMEOUT* = 1.seconds + FUTURE_TIMEOUT_MEDIUM* = 5.seconds FUTURE_TIMEOUT_LONG* = 10.seconds FUTURE_TIMEOUT_SHORT* = 100.milliseconds diff --git a/tests/testlib/simple_mock.nim b/tests/testlib/simple_mock.nim index 18bc9ed34..234647d33 100644 --- a/tests/testlib/simple_mock.nim +++ b/tests/testlib/simple_mock.nim @@ -6,7 +6,7 @@ type Instr {.union.} = object bytes: array[8, byte] value: uint64 -proc mockImpl(target, replacement: pointer) = +proc mockImpl*(target, replacement: pointer) = # YOLO who needs alignment #doAssert (cast[ByteAddress](target) and ByteAddress(0x07)) == 0 var page = cast[pointer](cast[ByteAddress](target) and (not 0xfff)) diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 4bcf35326..444a0fcf5 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -10,6 +10,7 @@ import eth/keys as eth_keys import ../../../waku/waku_node, + ../../../waku/waku_core/topics, ../../../waku/node/peer_manager, ../../../waku/waku_enr, ../../../waku/discovery/waku_discv5, @@ -58,7 +59,6 @@ proc newTestWakuNode*( dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - clusterId: uint32 = 1.uint32, pubsubTopics: seq[string] = @["/waku/2/rs/1/0"], peerStoreCapacity = none(int), ): WakuNode = @@ -73,6 +73,12 @@ proc newTestWakuNode*( var conf = defaultTestWakuNodeConf() + let clusterId = + if pubsubTopics.len() > 0: + NsPubsubTopic.parse(pubsubTopics[0]).get().clusterId + else: + 1.uint16 + conf.clusterId = clusterId conf.pubsubTopics = pubsubTopics diff --git a/tests/waku_archive/archive_utils.nim b/tests/waku_archive/archive_utils.nim index 13d0a533b..df2f80846 100644 --- a/tests/waku_archive/archive_utils.nim +++ b/tests/waku_archive/archive_utils.nim @@ -13,8 +13,8 @@ import ], ../testlib/[wakucore] -proc newSqliteDatabase*(): SqliteDatabase = - SqliteDatabase.new(":memory:").tryGet() +proc newSqliteDatabase*(path: Option[string] = string.none()): SqliteDatabase = + SqliteDatabase.new(path.get(":memory:")).tryGet() proc newSqliteArchiveDriver*(): ArchiveDriver = let database = newSqliteDatabase() diff --git a/tests/waku_discv5/utils.nim b/tests/waku_discv5/utils.nim index 5dd04e6e7..c66c4ca2f 100644 --- a/tests/waku_discv5/utils.nim +++ b/tests/waku_discv5/utils.nim @@ -6,7 +6,8 @@ import eth/keys as eth_keys import - ../../../waku/[waku_core/topics, waku_enr, discovery/waku_discv5], + ../../../waku/ + [waku_core/topics, waku_enr, discovery/waku_discv5, node/peer_manager/peer_manager], ../testlib/[common, wakucore] proc newTestDiscv5*( @@ -17,6 +18,7 @@ proc newTestDiscv5*( record: waku_enr.Record, bootstrapRecords = newSeq[waku_enr.Record](), queue = newAsyncEventQueue[SubscriptionEvent](30), + peerManager: Option[PeerManager] = none(PeerManager), ): WakuDiscoveryV5 = let config = WakuDiscoveryV5Config( privateKey: eth_keys.PrivateKey(privKey.skkey), @@ -26,7 +28,11 @@ proc newTestDiscv5*( ) let discv5 = WakuDiscoveryV5.new( - rng = rng(), conf = config, record = some(record), queue = queue + rng = rng(), + conf = config, + record = some(record), + queue = queue, + peerManager = peerManager, ) return discv5 diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 2ef5112a2..51f0f239a 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -47,7 +47,7 @@ randomize() const # TODO: Make configurable - DefaultDialTimeout = chronos.seconds(10) + DefaultDialTimeout* = chronos.seconds(10) # Max attempts before removing the peer MaxFailedAttempts = 5 @@ -146,6 +146,10 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: + # Reading from the db (pm.storage) is only done on startup, hence you need to connect to all saved peers. + # `remotePeerInfo.connectedness` should already be `NotConnected`, but both we reset it to `NotConnected` just in case. + # This reset is also done when reading from storage, I believe, to ensure the `connectedness` state is the correct one. + # So many resets are likely redudant, but I haven't verified whether this is the case or not. remotePeerInfo.connectedness = NotConnected pm.storage.insertOrReplace(remotePeerInfo) @@ -850,7 +854,7 @@ proc prunePeerStore*(pm: PeerManager) = continue for shard in rs.shardIds: - peersByShard.mgetOrPut(shard, @[peer]).add(peer) + peersByShard.mgetOrPut(shard, @[]).add(peer) # prune not connected peers without shard for peer in shardlessPeers: