diff --git a/apps/wakunode2/config.nim b/apps/wakunode2/config.nim index 9ba5c5eda..6290b5fd9 100644 --- a/apps/wakunode2/config.nim +++ b/apps/wakunode2/config.nim @@ -78,6 +78,11 @@ type defaultValue: 50 name: "max-connections" }: uint16 + peerStoreCapacity* {. + desc: "Maximum stored peers in the peerstore." + defaultValue: 100 + name: "peer-store-capacity" }: int + peerPersistence* {. desc: "Enable peer persistence.", defaultValue: false, diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index f8a10f955..d8917b524 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -273,7 +273,8 @@ proc initNode(conf: WakuNodeConf, conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled dns4DomainName, discv5UdpPort, - some(conf.agentString) + some(conf.agentString), + some(conf.peerStoreCapacity), ) except: return err("failed to create waku node instance: " & getCurrentExceptionMsg()) diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index da7687eec..e09c601e8 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -37,7 +37,7 @@ proc setupAndPublish() {.async.} = ip = ValidIpAddress.init("0.0.0.0") node = WakuNode.new(nodeKey, ip, Port(wakuPort)) flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) - + # assumes behind a firewall, so not care about being discoverable node.wakuDiscv5 = WakuDiscoveryV5.new( extIp= none(ValidIpAddress), @@ -59,7 +59,7 @@ proc setupAndPublish() {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) + let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) if numConnectedPeers >= 6: notice "publisher is ready", connectedPeers=numConnectedPeers, required=6 break @@ -85,4 +85,4 @@ proc setupAndPublish() {.async.} = await sleepAsync(5000) asyncSpawn setupAndPublish() -runForever() \ No newline at end of file +runForever() diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 7573794d1..2621878da 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -55,7 +55,7 @@ proc setupAndSubscribe() {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) + let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) if numConnectedPeers >= 6: notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6 break @@ -81,4 +81,4 @@ proc setupAndSubscribe() {.async.} = asyncSpawn setupAndSubscribe() -runForever() \ No newline at end of file +runForever() diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index e72f8af1f..30dc4e405 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -5,6 +5,7 @@ import stew/shims/net as stewNet, testutils/unittests, chronicles, + chronos, json_rpc/rpcserver, json_rpc/rpcclient, eth/keys, @@ -24,7 +25,8 @@ import ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_swap/waku_swap, - ../test_helpers + ../test_helpers, + ./testlib/testutils procSuite "Peer Manager": asyncTest "Peer dialing works": @@ -34,7 +36,7 @@ procSuite "Peer Manager": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802)) peerInfo2 = node2.switch.peerInfo - + await allFutures([node1.start(), node2.start()]) await node1.mountRelay() @@ -47,17 +49,17 @@ procSuite "Peer Manager": check: conn.activity conn.peerId == peerInfo2.peerId - + # Check that node2 is being managed in node1 check: - node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) # Check connectedness check: - node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected - + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected + await allFutures([node1.stop(), node2.stop()]) - + asyncTest "Dialing fails gracefully": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -65,7 +67,7 @@ procSuite "Peer Manager": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812)) peerInfo2 = node2.switch.peerInfo - + await node1.start() # Purposefully don't start node2 @@ -78,7 +80,7 @@ procSuite "Peer Manager": # Check connection failed gracefully check: connOpt.isNone() - + await node1.stop() asyncTest "Adding, selecting and filtering peers work": @@ -97,7 +99,7 @@ procSuite "Peer Manager": storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet() storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() storePeer = PeerInfo.new(storeKey, @[storeLoc]) - + await node.start() await node.mountFilterClient() @@ -105,25 +107,25 @@ procSuite "Peer Manager": node.mountStoreClient() node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - + node.setStorePeer(storePeer.toRemotePeerInfo()) node.setFilterPeer(filterPeer.toRemotePeerInfo()) # Check peers were successfully added to peer manager check: - node.peerManager.peers().len == 3 - node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and - it.addrs.contains(filterLoc) and - it.protos.contains(WakuFilterCodec)) - node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and - it.addrs.contains(swapLoc) and - it.protos.contains(WakuSwapCodec)) - node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and - it.addrs.contains(storeLoc) and - it.protos.contains(WakuStoreCodec)) - + node.peerManager.peerStore.peers().len == 3 + node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and + it.addrs.contains(filterLoc) and + it.protos.contains(WakuFilterCodec)) + node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and + it.addrs.contains(swapLoc) and + it.protos.contains(WakuSwapCodec)) + node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and + it.addrs.contains(storeLoc) and + it.protos.contains(WakuStoreCodec)) + await node.stop() - + asyncTest "Peer manager keeps track of connections": let @@ -132,7 +134,7 @@ procSuite "Peer Manager": nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832)) peerInfo2 = node2.switch.peerInfo - + await node1.start() await node1.mountRelay() @@ -142,28 +144,28 @@ procSuite "Peer Manager": node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec) check: # No information about node2's connectedness - node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected # Purposefully don't start node2 # Attempt dialing node2 from node1 discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Cannot connect to node2 - node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect # Successful connection await node2.start() discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Currently connected to node2 - node1.peerManager.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Stop node. Gracefully disconnect from all peers. await node1.stop() check: # Not currently connected to node2, but had recent, successful connection. - node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect - + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect + await node2.stop() asyncTest "Peer manager can use persistent storage and survive restarts": @@ -173,9 +175,9 @@ procSuite "Peer Manager": nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842)) peerInfo2 = node2.switch.peerInfo - + await node1.start() await node2.start() @@ -185,33 +187,34 @@ procSuite "Peer Manager": discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Currently connected to node2 - node1.peerManager.peers().len == 1 - node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.peerStore.peers().len == 1 + node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage) - + await node3.start() check: # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.peers().len == 1 - node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.peerStore.peers().len == 1 + node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() # This should trigger a reconnect - + check: # Reconnected to node2 after "restart" - node3.peerManager.peers().len == 1 - node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.connectedness(peerInfo2.peerId) == Connected - + node3.peerManager.peerStore.peers().len == 1 + node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + await allFutures([node1.stop(), node2.stop(), node3.stop()]) - asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": + # TODO: nwaku/issues/1377 + xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] @@ -222,7 +225,7 @@ procSuite "Peer Manager": peerInfo2 = node2.switch.peerInfo betaCodec = "/vac/waku/relay/2.0.0-beta2" stableCodec = "/vac/waku/relay/2.0.0" - + await node1.start() await node2.start() @@ -234,16 +237,16 @@ procSuite "Peer Manager": discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds) check: # Currently connected to node2 - node1.peerManager.peers().len == 1 - node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec)) - node1.peerManager.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.peerStore.peers().len == 1 + node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec)) + node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage) - + await node3.mountRelay() node3.wakuRelay.codec = stableCodec check: @@ -251,19 +254,63 @@ procSuite "Peer Manager": node2.wakuRelay.codec == betaCodec node3.wakuRelay.codec == stableCodec # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.peers().len == 1 - node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) - node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected - + node3.peerManager.peerStore.peers().len == 1 + node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec)) + node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected + await node3.start() # This should trigger a reconnect check: # Reconnected to node2 after "restart" - node3.peerManager.peers().len == 1 - node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) - node3.peerManager.peers().anyIt(it.protos.contains(stableCodec)) - node3.peerManager.connectedness(peerInfo2.peerId) == Connected - + node3.peerManager.peerStore.peers().len == 1 + node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec)) + node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec)) + node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + asyncTest "Peer manager connects to all peers supporting a given protocol": + # Create 4 nodes + var nodes: seq[WakuNode] + for i in 0..<4: + let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i)) + nodes &= node + + # Start them + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + # Get all peer infos + let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + + # Add all peers (but self) to node 0 + nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec) + nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec) + nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec) + + # Attempt to connect to all known peers supporting a given protocol + await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec)) + + check: + # Peerstore track all three peers + nodes[0].peerManager.peerStore.peers().len == 3 + + # All peer ids are correct + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId) + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId) + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId) + + # All peers support the relay protocol + nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec) + nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec) + nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec) + + # All peers are connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected + nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected + + await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim new file mode 100644 index 000000000..2fd8d2567 --- /dev/null +++ b/tests/v2/test_peer_store_extended.nim @@ -0,0 +1,256 @@ +{.used.} + +import + std/[options,sequtils], + libp2p/crypto/crypto, + libp2p/peerstore, + libp2p/multiaddress, + testutils/unittests +import + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/peer_manager/waku_peer_store, + ../../waku/v2/node/waku_node, + ../test_helpers, + ./testlib/testutils + + +suite "Extended nim-libp2p Peer Store": + # Valid peerId missing the last digit. Useful for creating new peerIds + # basePeerId & "1" + # basePeerId & "2" + let basePeerId = "QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" + + setup: + # Setup a nim-libp2p peerstore with some peers + let peerStore = PeerStore.new(capacity = 50) + var p1, p2, p3, p4, p5, p6: PeerId + + # create five peers basePeerId + [1-5] + require p1.init(basePeerId & "1") + require p2.init(basePeerId & "2") + require p3.init(basePeerId & "3") + require p4.init(basePeerId & "4") + require p5.init(basePeerId & "5") + + # peer6 is not part of the peerstore + require p6.init(basePeerId & "6") + + # Peer1: Connected + peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + peerStore[AgentBook][p1] = "nwaku" + peerStore[ProtoVersionBook][p1] = "protoVersion1" + peerStore[ConnectionBook][p1] = Connected + peerStore[DisconnectBook][p1] = 0 + peerStore[SourceBook][p1] = Discv5 + + # Peer2: Connected + peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()] + peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"] + peerStore[KeyBook][p2] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + peerStore[AgentBook][p2] = "nwaku" + peerStore[ProtoVersionBook][p2] = "protoVersion2" + peerStore[ConnectionBook][p2] = Connected + peerStore[DisconnectBook][p2] = 0 + peerStore[SourceBook][p2] = Discv5 + + # Peer3: Connected + peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] + peerStore[ProtoBook][p3] = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + peerStore[KeyBook][p3] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + peerStore[AgentBook][p3] = "gowaku" + peerStore[ProtoVersionBook][p3] = "protoVersion3" + peerStore[ConnectionBook][p3] = Connected + peerStore[DisconnectBook][p3] = 0 + peerStore[SourceBook][p3] = Discv5 + + # Peer4: Added but never connected + peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()] + # unknown: peerStore[ProtoBook][p4] + peerStore[KeyBook][p4] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + # unknown: peerStore[AgentBook][p4] + # unknown: peerStore[ProtoVersionBook][p4] + peerStore[ConnectionBook][p4] = NotConnected + peerStore[DisconnectBook][p4] = 0 + peerStore[SourceBook][p4] = Discv5 + + # Peer5: Connecteed in the past + peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()] + peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + peerStore[KeyBook][p5] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + peerStore[AgentBook][p5] = "gowaku" + peerStore[ProtoVersionBook][p5] = "protoVersion5" + peerStore[ConnectionBook][p5] = CanConnect + peerStore[DisconnectBook][p5] = 1000 + peerStore[SourceBook][p5] = Discv5 + + test "get() returns the correct StoredInfo for a given PeerId": + # When + let storedInfoPeer1 = peerStore.get(p1) + let storedInfoPeer6 = peerStore.get(p6) + + # Then + check: + # regression on nim-libp2p fields + storedInfoPeer1.peerId == p1 + storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + storedInfoPeer1.agent == "nwaku" + storedInfoPeer1.protoVersion == "protoVersion1" + + # our extended fields + storedInfoPeer1.connectedness == Connected + storedInfoPeer1.disconnectTime == 0 + storedInfoPeer1.origin == Discv5 + + check: + # fields are empty + storedInfoPeer6.peerId == p6 + storedInfoPeer6.addrs.len == 0 + storedInfoPeer6.protos.len == 0 + storedInfoPeer6.agent == "" + storedInfoPeer6.protoVersion == "" + storedInfoPeer6.connectedness == NotConnected + storedInfoPeer6.disconnectTime == 0 + storedInfoPeer6.origin == Unknown + + test "peers() returns all StoredInfo of the PeerStore": + # When + let allPeers = peerStore.peers() + + # Then + check: + allPeers.len == 5 + allPeers.anyIt(it.peerId == p1) + allPeers.anyIt(it.peerId == p2) + allPeers.anyIt(it.peerId == p3) + allPeers.anyIt(it.peerId == p4) + allPeers.anyIt(it.peerId == p5) + + let p3 = allPeers.filterIt(it.peerId == p3)[0] + + check: + # regression on nim-libp2p fields + p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] + p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + p3.agent == "gowaku" + p3.protoVersion == "protoVersion3" + + # our extended fields + p3.connectedness == Connected + p3.disconnectTime == 0 + p3.origin == Discv5 + + test "peers() returns all StoredInfo matching a specific protocol": + # When + let storePeers = peerStore.peers("/vac/waku/store/2.0.0") + let lpPeers = peerStore.peers("/vac/waku/lightpush/2.0.0") + + # Then + check: + # Only p1 and p2 support that protocol + storePeers.len == 2 + storePeers.anyIt(it.peerId == p1) + storePeers.anyIt(it.peerId == p2) + + check: + # Only p3 supports that protocol + lpPeers.len == 1 + lpPeers.anyIt(it.peerId == p3) + lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + + test "peers() returns all StoredInfo matching a given protocolMatcher": + # When + let pMatcherStorePeers = peerStore.peers(protocolMatcher("/vac/waku/store/2.0.0")) + let pMatcherSwapPeers = peerStore.peers(protocolMatcher("/vac/waku/swap/2.0.0")) + + # Then + check: + # peers: 1,2,3,5 match /vac/waku/store/2.0.0/xxx + pMatcherStorePeers.len == 4 + pMatcherStorePeers.anyIt(it.peerId == p1) + pMatcherStorePeers.anyIt(it.peerId == p2) + pMatcherStorePeers.anyIt(it.peerId == p3) + pMatcherStorePeers.anyIt(it.peerId == p5) + + check: + pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"] + pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + + check: + pMatcherSwapPeers.len == 1 + pMatcherSwapPeers.anyIt(it.peerId == p5) + pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + + test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo": + # Given + let storedInfoPeer1 = peerStore.get(p1) + + # When + let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo() + + # Then + check: + remotePeerInfo1.peerId == p1 + remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + + test "connectedness() returns the connection status of a given PeerId": + check: + # peers tracked in the peerstore + peerStore.connectedness(p1) == Connected + peerStore.connectedness(p2) == Connected + peerStore.connectedness(p3) == Connected + peerStore.connectedness(p4) == NotConnected + peerStore.connectedness(p5) == CanConnect + + # peer not tracked in the peerstore + peerStore.connectedness(p6) == NotConnected + + test "hasPeer() returns true if the peer supports a given protocol": + check: + peerStore.hasPeer(p1, "/vac/waku/relay/2.0.0-beta1") + peerStore.hasPeer(p1, "/vac/waku/store/2.0.0") + not peerStore.hasPeer(p1, "it-does-not-contain-this-protocol") + + peerStore.hasPeer(p2, "/vac/waku/relay/2.0.0") + peerStore.hasPeer(p2, "/vac/waku/store/2.0.0") + + peerStore.hasPeer(p3, "/vac/waku/lightpush/2.0.0") + peerStore.hasPeer(p3, "/vac/waku/store/2.0.0-beta1") + + # we have no knowledge of p4 supported protocols + not peerStore.hasPeer(p4, "/vac/waku/lightpush/2.0.0") + + peerStore.hasPeer(p5, "/vac/waku/swap/2.0.0") + peerStore.hasPeer(p5, "/vac/waku/store/2.0.0-beta2") + not peerStore.hasPeer(p5, "another-protocol-not-contained") + + # peer 6 is not in the PeerStore + not peerStore.hasPeer(p6, "/vac/waku/lightpush/2.0.0") + + test "hasPeers() returns true if any peer in the PeerStore supports a given protocol": + # Match specific protocols + check: + peerStore.hasPeers("/vac/waku/relay/2.0.0-beta1") + peerStore.hasPeers("/vac/waku/store/2.0.0") + peerStore.hasPeers("/vac/waku/lightpush/2.0.0") + not peerStore.hasPeers("/vac/waku/does-not-exist/2.0.0") + + # Match protocolMatcher protocols + check: + peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0")) + not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0")) + + test "selectPeer() returns if a peer supports a given protocol": + # When + let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0") + + # Then + check: + swapPeer.isSome() + swapPeer.get().peerId == p5 + swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] diff --git a/tests/v2/test_waku_dnsdisc.nim b/tests/v2/test_waku_dnsdisc.nim index bee6a0a5b..2467b9d66 100644 --- a/tests/v2/test_waku_dnsdisc.nim +++ b/tests/v2/test_waku_dnsdisc.nim @@ -88,11 +88,11 @@ procSuite "Waku DNS Discovery": check: # We have successfully connected to all discovered nodes - node4.peerManager.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId) - node4.peerManager.connectedness(node1.switch.peerInfo.peerId) == Connected - node4.peerManager.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId) - node4.peerManager.connectedness(node2.switch.peerInfo.peerId) == Connected - node4.peerManager.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId) - node4.peerManager.connectedness(node3.switch.peerInfo.peerId) == Connected + node4.peerManager.peerStore.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId) + node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected + node4.peerManager.peerStore.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId) + node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected + node4.peerManager.peerStore.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId) + node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()]) diff --git a/tools/wakucanary/wakucanary.nim b/tools/wakucanary/wakucanary.nim index 86d43ef25..9f3ad5f09 100644 --- a/tools/wakucanary/wakucanary.nim +++ b/tools/wakucanary/wakucanary.nim @@ -101,7 +101,7 @@ proc main(): Future[int] {.async.} = # ensure input protocols are valid for p in conf.protocols: - if p notin ProtocolsTable: + if p notin ProtocolsTable: error "invalid protocol", protocol=p, valid=ProtocolsTable raise newException(ConfigurationError, "Invalid cli flag values" & p) @@ -128,7 +128,7 @@ proc main(): Future[int] {.async.} = error "Timedout after", timeout=conf.timeout let lp2pPeerStore = node.switch.peerStore - let conStatus = node.peerManager.peerStore.connectionBook[peer.peerId] + let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId] if conStatus in [Connected, CanConnect]: let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId] diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 57646d1c6..10cdd1d13 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az318-892: +# Libtool was configured on host fv-az39-360: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 4a9333cc2..2cb853770 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -39,7 +39,7 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Admin API version 1 definitions - + rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool: ## Connect to a list of peers debug "post_waku_v2_admin_v1_peers" @@ -64,34 +64,38 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if not node.wakuRelay.isNil: # Map managed peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peers(WakuRelayCodec) + wPeers.insert(node.peerManager.peerStore + .peers(WakuRelayCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuRelayCodec, - connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), + connected: it.connectedness == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence - + if not node.wakuFilter.isNil: # Map WakuFilter peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peers(WakuFilterCodec) + wPeers.insert(node.peerManager.peerStore + .peers(WakuFilterCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuFilterCodec, - connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), + connected: it.connectedness == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence - + if not node.wakuSwap.isNil: # Map WakuSwap peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peers(WakuSwapCodec) + wPeers.insert(node.peerManager.peerStore + .peers(WakuSwapCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuSwapCodec, - connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), + connected: it.connectedness == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence if not node.wakuStore.isNil: # Map WakuStore peers to WakuPeers and add to return list - wPeers.insert(node.peerManager.peers(WakuStoreCodec) + wPeers.insert(node.peerManager.peerStore + .peers(WakuStoreCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuStoreCodec, - connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), + connected: it.connectedness == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence # @TODO filter output on protocol/connected-status diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index f5e75b60a..ccea0e23b 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" - let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec) if peerOpt.isNone(): raise newException(ValueError, "no suitable remote store peers") diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index a7384a6e9..ca9f928de 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -6,8 +6,8 @@ else: import std/[options, sets, sequtils, times], - chronos, - chronicles, + chronos, + chronicles, metrics, libp2p/multistream import @@ -29,7 +29,7 @@ logScope: type PeerManager* = ref object of RootObj switch*: Switch - peerStore*: WakuPeerStore + peerStore*: PeerStore storage: PeerStorage let @@ -39,11 +39,6 @@ let # Helper functions # #################### -proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo = - RemotePeerInfo.init(peerId = storedInfo.peerId, - addrs = toSeq(storedInfo.addrs), - protocols = toSeq(storedInfo.protos)) - proc insertOrReplace(ps: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, @@ -55,7 +50,7 @@ proc insertOrReplace(ps: PeerStorage, warn "failed to store peers", err = res.error waku_peers_errors.inc(labelValues = ["storage_failure"]) -proc dialPeer(pm: PeerManager, peerId: PeerID, +proc dialPeer(pm: PeerManager, peerId: PeerID, addrs: seq[MultiAddress], proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = info "Dialing peer from manager", wireAddr = addrs, peerId = peerId @@ -73,20 +68,20 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, debug "Dialing remote peer timed out" waku_peers_dials.inc(labelValues = ["timeout"]) - pm.peerStore.connectionBook[peerId] = CannotConnect + pm.peerStore[ConnectionBook][peerId] = CannotConnect if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) - + return none(Connection) except CatchableError as e: # TODO: any redial attempts? debug "Dialing remote peer failed", msg = e.msg waku_peers_dials.inc(labelValues = ["failed"]) - - pm.peerStore.connectionBook[peerId] = CannotConnect + + pm.peerStore[ConnectionBook][peerId] = CannotConnect if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) - + return none(Connection) proc loadFromStorage(pm: PeerManager) = @@ -99,49 +94,55 @@ proc loadFromStorage(pm: PeerManager) = # Do not manage self return - pm.peerStore.addressBook[peerId] = storedInfo.addrs - pm.peerStore.protoBook[peerId] = storedInfo.protos - pm.peerStore.keyBook[peerId] = storedInfo.publicKey - pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state - pm.peerStore.disconnectBook[peerId] = disconnectTime - + # nim-libp2p books + pm.peerStore[AddressBook][peerId] = storedInfo.addrs + pm.peerStore[ProtoBook][peerId] = storedInfo.protos + pm.peerStore[KeyBook][peerId] = storedInfo.publicKey + pm.peerStore[AgentBook][peerId] = storedInfo.agent + pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion + + # custom books + pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state + pm.peerStore[DisconnectBook][peerId] = disconnectTime + pm.peerStore[SourceBook][peerId] = storedInfo.origin + let res = pm.storage.getAll(onData) if res.isErr: warn "failed to load peers from storage", err = res.error waku_peers_errors.inc(labelValues = ["storage_load_failure"]) else: debug "successfully queried peer storage" - + ################## # Initialisation # -################## +################## proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = - if not pm.peerStore.addressBook.contains(peerId): + if not pm.peerStore[AddressBook].contains(peerId): ## We only consider connection events if we ## already track some addresses for this peer return case event.kind of ConnEventKind.Connected: - pm.peerStore.connectionBook[peerId] = Connected + pm.peerStore[ConnectionBook][peerId] = Connected if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) return of ConnEventKind.Disconnected: - pm.peerStore.connectionBook[peerId] = CanConnect + pm.peerStore[ConnectionBook][peerId] = CanConnect if not pm.storage.isNil: pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) return proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = let pm = PeerManager(switch: switch, - peerStore: WakuPeerStore.new(), + peerStore: switch.peerStore, storage: storage) - + proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) - + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) @@ -150,122 +151,68 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer pm.loadFromStorage() # Load previously managed peers. else: debug "no peer storage found" - + return pm ##################### # Manager interface # ##################### -proc peers*(pm: PeerManager): seq[StoredInfo] = - # Return the known info for all peers - pm.peerStore.peers() - -proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] = - # Return the known info for all peers registered on the specified protocol - pm.peers.filterIt(it.protos.contains(proto)) - -proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] = - # Return the known info for all peers matching the provided protocolMatcher - pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) - -proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness = - # Return the connection state of the given, managed peer - # TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. - # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts - - let storedInfo = pm.peerStore.get(peerId) - - if (storedInfo == StoredInfo()): - # Peer is not managed, therefore not connected - return NotConnected - else: - pm.peerStore.connectionBook[peerId] - -proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool = - # Returns `true` if peer is included in manager for the specified protocol - - pm.peerStore.get(peerId).protos.contains(proto) - -proc hasPeers*(pm: PeerManager, proto: string): bool = - # Returns `true` if manager has any peers for the specified protocol - pm.peers.anyIt(it.protos.contains(proto)) - -proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool = - # Returns `true` if manager has any peers matching the protocolMatcher - pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) - proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = # Adds peer to manager for the specified protocol if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: # Do not attempt to manage our unmanageable self return - + debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto - + # ...known addresses for multiaddr in remotePeerInfo.addrs: - pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr) - + pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr + # ...public key var publicKey: PublicKey discard remotePeerInfo.peerId.extractPublicKey(publicKey) - pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey + pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey - # ...associated protocols - pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto) + # nim-libp2p identify overrides this + pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) -proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = - # Selects the best peer for a given protocol - let peers = pm.peers.filterIt(it.protos.contains(proto)) - - if peers.len >= 1: - # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned - let peerStored = peers[0] - - return some(peerStored.toRemotePeerInfo()) - else: - return none(RemotePeerInfo) - proc reconnectPeers*(pm: PeerManager, proto: string, protocolMatcher: Matcher, backoff: chronos.Duration = chronos.seconds(0)) {.async.} = ## Reconnect to peers registered for this protocol. This will update connectedness. ## Especially useful to resume connections from persistent storage after a restart. - + debug "Reconnecting peers", proto=proto - - for storedInfo in pm.peers(protocolMatcher): - # Check if peer is reachable. - if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect: - debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId + + for storedInfo in pm.peerStore.peers(protocolMatcher): + # Check that the peer can be connected + if storedInfo.connectedness == CannotConnect: + debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId continue - + # Respect optional backoff period where applicable. let - disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert + # TODO: Add method to peerStore (eg isBackoffExpired()) + disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect - + trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime - + + # TODO: This blocks the whole function. Try to connect to another peer in the meantime. if backoffTime > ZeroDuration: debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime # We disconnected recently and still need to wait for a backoff period before connecting await sleepAsync(backoffTime) - - # Add to protos for peer, if it has not been added yet - if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto): - let remotePeerInfo = storedInfo.toRemotePeerInfo() - trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto - pm.addPeer(remotePeerInfo, proto) trace "Reconnecting to peer", peerId=storedInfo.peerId discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) @@ -277,12 +224,12 @@ proc reconnectPeers*(pm: PeerManager, proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - + # First add dialed peer info to peer store, if it does not exist yet... - if not pm.hasPeer(remotePeerInfo.peerId, proto): + if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto): trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto pm.addPeer(remotePeerInfo, proto) - + if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: # Do not attempt to dial self return none(Connection) @@ -292,7 +239,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial an existing peer by looking up it's existing addrs in the switch's peerStore # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - + if peerId == pm.switch.peerInfo.peerId: # Do not attempt to dial self return none(Connection) @@ -304,10 +251,10 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} = ## `source` indicates source of node addrs (static config, api call, discovery, etc) info "Connecting to node", remotePeer = remotePeer, source = source - + info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId let connOpt = await pm.dialPeer(remotePeer, proto) - + if connOpt.isSome(): info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId waku_node_conns_initiated.inc(labelValues = [source]) @@ -318,7 +265,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} = ## `source` indicates source of node addrs (static config, api call, discovery, etc) info "connectToNodes", len = nodes.len - + for nodeId in nodes: await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source) @@ -333,7 +280,7 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} = ## `source` indicates source of node addrs (static config, api call, discovery, etc) info "connectToNodes", len = nodes.len - + for remotePeerInfo in nodes: await connectToNode(pm, remotePeerInfo, proto, source) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 807423fba..41e5da88d 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -4,12 +4,17 @@ else: {.push raises: [].} import - std/[tables, sequtils, sets], + std/[tables, sequtils, sets, options], libp2p/builders, libp2p/peerstore +import + ../../utils/peers + export peerstore, builders +# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension + type Connectedness* = enum # NotConnected: default state for a new peer. No connection and no further information on connectedness. @@ -20,74 +25,107 @@ type CanConnect, # Connected: actively connected to peer. Connected - + + PeerOrigin* = enum + Unknown, + Discv5, + Static, + Dns + + # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] - DisconnectBook* = ref object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps + # Keeps track of when peers were disconnected in Unix timestamps + DisconnectBook* = ref object of PeerBook[int64] + + # Keeps track of the origin of a peer + SourceBook* = ref object of PeerBook[PeerOrigin] - WakuPeerStore* = ref object - addressBook*: AddressBook - protoBook*: ProtoBook - keyBook*: KeyBook - connectionBook*: ConnectionBook - disconnectBook*: DisconnectBook - StoredInfo* = object - # Collates stored info about a peer - peerId*: PeerID + # Taken from nim-libp2 + peerId*: PeerId addrs*: seq[MultiAddress] protos*: seq[string] publicKey*: PublicKey + agent*: string + protoVersion*: string -proc new*(T: type WakuPeerStore): WakuPeerStore = - let - addressBook = AddressBook(book: initTable[PeerID, seq[MultiAddress]]()) - protoBook = ProtoBook(book: initTable[PeerID, seq[string]]()) - keyBook = KeyBook(book: initTable[PeerID, PublicKey]()) - connectionBook = ConnectionBook(book: initTable[PeerID, Connectedness]()) - disconnectBook = DisconnectBook(book: initTable[PeerID, int64]()) - - T(addressBook: addressBook, - protoBook: protoBook, - keyBook: keyBook, - connectionBook: connectionBook, - disconnectBook: disconnectBook) + # Extended custom fields + connectedness*: Connectedness + disconnectTime*: int64 + origin*: PeerOrigin -##################### -# Utility functions # -##################### - -proc add*[T](peerBook: SeqPeerBook[T], - peerId: PeerId, - entry: T) = - ## Add entry to a given peer. If the peer is not known, - ## it will be set with the provided entry. - - peerBook.book.mgetOrPut(peerId, - newSeq[T]()).add(entry) - - # TODO: Notify clients? - -################## +################## # Peer Store API # ################## -proc get*(peerStore: WakuPeerStore, +proc get*(peerStore: PeerStore, peerId: PeerID): StoredInfo = ## Get the stored information of a given peer. - StoredInfo( + # Taken from nim-libp2 peerId: peerId, - addrs: peerStore.addressBook[peerId], - protos: peerStore.protoBook[peerId], - publicKey: peerStore.keyBook[peerId] + addrs: peerStore[AddressBook][peerId], + protos: peerStore[ProtoBook][peerId], + publicKey: peerStore[KeyBook][peerId], + agent: peerStore[AgentBook][peerId], + protoVersion: peerStore[ProtoVersionBook][peerId], + + # Extended custom fields + connectedness: peerStore[ConnectionBook][peerId], + disconnectTime: peerStore[DisconnectBook][peerId], + origin: peerStore[SourceBook][peerId], ) -proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] = +proc peers*(peerStore: PeerStore): seq[StoredInfo] = ## Get all the stored information of every peer. - - let allKeys = concat(toSeq(keys(peerStore.addressBook.book)), - toSeq(keys(peerStore.protoBook.book)), - toSeq(keys(peerStore.keyBook.book))).toHashSet() + let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()), + toSeq(peerStore[ProtoBook].book.keys()), + toSeq(peerStore[KeyBook].book.keys())).toHashSet() return allKeys.mapIt(peerStore.get(it)) + +proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] = + # Return the known info for all peers registered on the specified protocol + peerStore.peers.filterIt(it.protos.contains(proto)) + +proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] = + # Return the known info for all peers matching the provided protocolMatcher + peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it))) + +proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo = + RemotePeerInfo.init(peerId = storedInfo.peerId, + addrs = toSeq(storedInfo.addrs), + protocols = toSeq(storedInfo.protos)) + + +proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = + # Return the connection state of the given, managed peer + # TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. + # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts + return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) + +proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = + # Returns `true` if peer is included in manager for the specified protocol + # TODO: What if peer does not exist in the peerStore? + peerStore.get(peerId).protos.contains(proto) + +proc hasPeers*(peerStore: PeerStore, proto: string): bool = + # Returns `true` if the peerstore has any peer for the specified protocol + toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) + +proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = + # Returns `true` if the peerstore has any peer matching the protocolMatcher + toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) + +proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] = + # Selects the best peer for a given protocol + let peers = peerStore.peers().filterIt(it.protos.contains(proto)) + + if peers.len >= 1: + # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + let peerStored = peers[0] + + return some(peerStored.toRemotePeerInfo()) + else: + return none(RemotePeerInfo) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index d8a9d72bd..426e85771 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -102,7 +102,7 @@ type started*: bool # Indicates that node has started listening -proc protocolMatcher(codec: string): Matcher = +proc protocolMatcher*(codec: string): Matcher = ## Returns a protocol matcher function for the provided codec proc match(proto: string): bool {.gcsafe.} = ## Matches a proto with any postfix to the provided codec. @@ -146,7 +146,8 @@ proc new*(T: type WakuNode, sendSignedPeerRecord = false, dns4DomainName = none(string), discv5UdpPort = none(Port), - agentString = none(string), # defaults to nim-libp2p version + agentString = none(string), # defaults to nim-libp2p version + peerStoreCapacity = none(int), # defaults to nim-libp2p max size ): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = ## Creates a Waku Node instance. @@ -217,7 +218,8 @@ proc new*(T: type WakuNode, secureCertPath = secureCert, nameResolver = nameResolver, sendSignedPeerRecord = sendSignedPeerRecord, - agentString = agentString + agentString = agentString, + peerStoreCapacity = peerStoreCapacity, ) let wakuNode = WakuNode( @@ -357,7 +359,7 @@ proc startRelay*(node: WakuNode) {.async.} = node.subscribe(topic, none(TopicHandler)) # Resume previous relay connections - if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): + if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." # Reconnect to previous relay peers. This will respect a backoff period, if necessary @@ -502,7 +504,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content error "cannot register filter subscription to topic", error="waku filter client is nil" return - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" return @@ -517,7 +519,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte error "cannot unregister filter subscription to content", error="waku filter client is nil" return - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec) if peerOpt.isNone(): error "cannot register filter subscription to topic", error="no suitable remote peers" return @@ -675,7 +677,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History if node.wakuStoreClient.isNil(): return err("waku store client is nil") - let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec) if peerOpt.isNone(): error "no suitable remote peers" return err("peer_not_found_failure") @@ -764,7 +766,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe error "failed to publish message", error="waku lightpush client is nil" return - let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) + let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): error "failed to publish message", error="no suitable remote peers" return @@ -824,14 +826,15 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.libp2pPing) +# TODO: Move this logic to PeerManager proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = while node.started: # Keep all connected peers alive while running trace "Running keepalive" # First get a list of connected peer infos - let peers = node.peerManager.peers() - .filterIt(node.peerManager.connectedness(it.peerId) == Connected) + let peers = node.peerManager.peerStore.peers() + .filterIt(it.connectedness == Connected) .mapIt(it.toRemotePeerInfo()) # Attempt to retrieve and ping the active outgoing connection for each peer @@ -855,6 +858,9 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) +# TODO: Decouple discovery logic from connection logic +# A discovered peer goes to the PeerStore +# The PeerManager uses to PeerStore to dial peers proc runDiscv5Loop(node: WakuNode) {.async.} = ## Continuously add newly discovered nodes ## using Node Discovery v5 diff --git a/waku/v2/node/wakuswitch.nim b/waku/v2/node/wakuswitch.nim index ecb7b02ee..4bf2c2c05 100644 --- a/waku/v2/node/wakuswitch.nim +++ b/waku/v2/node/wakuswitch.nim @@ -43,7 +43,7 @@ proc withWssTransport*(b: SwitchBuilder, secureKeyPath: string, secureCertPath: string): SwitchBuilder {.raises: [Defect, IOError].} = - + let key : TLSPrivateKey = getSecureKey(secureKeyPath) let cert : TLSCertificate = getSecureCert(secureCertPath) b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr, @@ -71,7 +71,8 @@ proc newWakuSwitch*( wssEnabled: bool = false, secureKeyPath: string = "", secureCertPath: string = "", - agentString = none(string), # defaults to nim-libp2p version + agentString = none(string), # defaults to nim-libp2p version, + peerStoreCapacity = none(int), # defaults to nim-libp2p max size ): Switch {.raises: [Defect, IOError, LPError].} = @@ -88,6 +89,8 @@ proc newWakuSwitch*( .withNameResolver(nameResolver) .withSignedPeerRecord(sendSignedPeerRecord) + if peerStoreCapacity.isSome(): + b = b.withPeerStore(peerStoreCapacity.get()) if agentString.isSome(): b = b.withAgentVersion(agentString.get()) if privKey.isSome(): @@ -103,4 +106,4 @@ proc newWakuSwitch*( else : b = b.withAddress(address) - b.build() \ No newline at end of file + b.build() diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index ff33f1ffe..14fea22b4 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut return ok() proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) + let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) @@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo return ok() proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) + let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index f3a9cdf78..b9f3549dd 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -210,7 +210,7 @@ when defined(waku_exp_store_resume): else: debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" waku_store_errors.inc(labelValues = [peerNotFoundFailure])