diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 47cb2aea5..e80d004e6 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[strformat, strutils, times, json, options, random] +import std/[strformat, strutils, times, options, random] import confutils, chronicles, chronos, stew/shims/net as stewNet, eth/keys, bearssl, stew/[byteutils, results], - nimcrypto/pbkdf2, metrics, metrics/chronos_httpserver import libp2p/[switch, # manage transports, a single entry point for dialing and listening @@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi peerinfo, # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs - protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS nameresolving/dnsresolver]# define DNS resolution import ../../waku/waku_core, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/rpc, ../../waku/waku_filter, ../../waku/waku_enr, diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 6308ea71c..a8516a7b0 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -53,7 +53,7 @@ import ../../waku/waku_peer_exchange, ../../waku/waku_rln_relay, ../../waku/waku_store, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_filter, ../../waku/waku_filter_v2, ./wakunode2_validator_signed, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 8576f2725..a480adc50 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, times], + std/[options, sequtils, times, sugar], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -21,10 +21,12 @@ import ../../waku/node/peer_manager/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/waku_node, - ../../waku/waku_relay, - ../../waku/waku_store, - ../../waku/waku_filter, - ../../waku/waku_lightpush, + ../../waku/waku_core, + ../../waku/waku_enr/capabilities, + ../../waku/waku_relay/protocol, + ../../waku/waku_store/common, + ../../waku/waku_filter/protocol, + ../../waku/waku_lightpush/common, ../../waku/waku_peer_exchange, ../../waku/waku_metadata, ./testlib/common, @@ -128,7 +130,6 @@ procSuite "Peer Manager": await node.stop() - asyncTest "Peer manager keeps track of connections": # Create 2 nodes let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) @@ -225,18 +226,34 @@ procSuite "Peer Manager": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] - node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) - node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) - peerInfo2 = node2.switch.peerInfo + node1 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("127.0.0.1"), + Port(44048), + peerStorage = storage + ) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)) + + node1.mountMetadata(0).expect("Mounted Waku Metadata") + node2.mountMetadata(0).expect("Mounted Waku Metadata") await node1.start() await node2.start() await node1.mountRelay() await node2.mountRelay() + + let peerInfo2 = node2.switch.peerInfo + var remotePeerInfo2 = peerInfo2.toRemotePeerInfo() + remotePeerInfo2.enr = some(node2.enr) - require: - (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true + let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2) + assert is12Connected == true, "Node 1 and 2 not connected" + + check: + node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs + + # wait for the peer store update await sleepAsync(chronos.milliseconds(500)) check: @@ -246,10 +263,17 @@ procSuite "Peer Manager": node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage - let - node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) + let node3 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("127.0.0.1"), + Port(56037), + peerStorage = storage + ) + + node3.mountMetadata(0).expect("Mounted Waku Metadata") await node3.start() + check: # Node2 has been loaded after "restart", but we have not yet reconnected node3.peerManager.peerStore.peers().len == 1 @@ -257,7 +281,10 @@ procSuite "Peer Manager": node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() - await node3.peerManager.connectToRelayPeers() + + await node3.peerManager.manageRelayPeers() + + await sleepAsync(chronos.milliseconds(500)) check: # Reconnected to node2 after "restart" @@ -297,9 +324,9 @@ procSuite "Peer Manager": topics = @["/waku/2/rs/4/0"], ) - discard node1.mountMetadata(clusterId3) - discard node2.mountMetadata(clusterId4) - discard node3.mountMetadata(clusterId4) + node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata") + node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata") + node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata") # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) @@ -318,7 +345,6 @@ procSuite "Peer Manager": conn2.isNone conn3.isSome - # TODO: nwaku/issues/1377 xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let @@ -377,14 +403,28 @@ procSuite "Peer Manager": asyncTest "Peer manager connects to all peers supporting a given protocol": # Create 4 nodes - let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + let nodes = + toSeq(0..<4) + .mapIt( + newTestWakuNode( + nodeKey = generateSecp256k1Key(), + bindIp = ValidIpAddress.init("0.0.0.0"), + bindPort = Port(0), + wakuFlags = some(CapabilitiesBitfield.init(@[Relay])) + ) + ) # Start them - await allFutures(nodes.mapIt(it.start())) + discard nodes.mapIt(it.mountMetadata(0)) await allFutures(nodes.mapIt(it.mountRelay())) + await allFutures(nodes.mapIt(it.start())) # Get all peer infos - let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + let peerInfos = collect: + for i in 0..nodes.high: + let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo() + peerInfo.enr = some(nodes[i].enr) + peerInfo # Add all peers (but self) to node 0 nodes[0].peerManager.addPeer(peerInfos[1]) @@ -392,7 +432,7 @@ procSuite "Peer Manager": nodes[0].peerManager.addPeer(peerInfos[3]) # Connect to relay peers - await nodes[0].peerManager.connectToRelayPeers() + await nodes[0].peerManager.manageRelayPeers() check: # Peerstore track all three peers diff --git a/tests/test_waku_lightpush.nim b/tests/test_waku_lightpush.nim index de125ac97..76312272f 100644 --- a/tests/test_waku_lightpush.nim +++ b/tests/test_waku_lightpush.nim @@ -11,6 +11,7 @@ import ../../waku/node/peer_manager, ../../waku/waku_core, ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/client, ../../waku/waku_lightpush/protocol_metrics, ../../waku/waku_lightpush/rpc, diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 7208c587b..4daa12283 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -4,16 +4,12 @@ import std/options, stew/shims/net as stewNet, testutils/unittests, - chronicles, - chronos, - libp2p/crypto/crypto, - libp2p/switch + chronos import ../../waku/waku_core, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/node/peer_manager, ../../waku/waku_node, - ./testlib/common, ./testlib/wakucore, ./testlib/wakunode diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index f614b272b..9ba791204 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -32,7 +32,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")], nat: "any", maxConnections: 50, - topics: @[], + clusterId: 1.uint32, + topics: @["/waku/2/rs/1/0"], relay: true ) @@ -55,8 +56,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - clusterId: uint32 = 2.uint32, - topics: seq[string] = @["/waku/2/rs/2/0"], + clusterId: uint32 = 1.uint32, + topics: seq[string] = @["/waku/2/rs/1/0"], peerStoreCapacity = none(int)): WakuNode = var resolvedExtIp = extIp @@ -66,7 +67,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000)) else: extPort - let conf = defaultTestWakuNodeConf() + var conf = defaultTestWakuNodeConf() + + conf.clusterId = clusterId + conf.topics = topics if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 5581da5c7..72fb4c7b8 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -10,11 +10,10 @@ import import ../../waku/waku_api/message_cache, - ../../waku/common/base64, ../../waku/waku_core, ../../waku/waku_node, ../../waku/node/peer_manager, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/client, ../../waku/waku_api/rest/responses, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 69db53eed..dd77aefb6 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -5,7 +5,7 @@ else: import - std/[options, sets, sequtils, times, strutils, math], + std/[options, sugar, sets, sequtils, times, strutils, math], chronos, chronicles, metrics, @@ -17,6 +17,10 @@ import ../../waku_core, ../../waku_relay, ../../waku_enr/sharding, + ../../waku_enr/capabilities, + ../../waku_store/common, + ../../waku_filter_v2/common, + ../../waku_lightpush/common, ../../waku_metadata, ./peer_store/peer_storage, ./waku_peer_store @@ -49,10 +53,10 @@ const BackoffFactor = 4 # Limit the amount of paralel dials - MaxParalelDials = 10 + MaxParallelDials = 10 # Delay between consecutive relayConnectivityLoop runs - ConnectivityLoopInterval = chronos.seconds(15) + ConnectivityLoopInterval = chronos.minutes(1) # How often the peer store is pruned PrunePeerStoreInterval = chronos.minutes(10) @@ -115,22 +119,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Do not attempt to manage our unmanageable self return - # ...public key - var publicKey: PublicKey - discard remotePeerInfo.peerId.extractPublicKey(publicKey) - if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and + pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: # Peer already managed and ENR info is already saved return trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs - + pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey + pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin - + + if remotePeerInfo.protocols.len > 0: + pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols + if remotePeerInfo.enr.isSome(): pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() @@ -158,27 +161,31 @@ proc connectRelay*(pm: PeerManager, pm.addPeer(peer) let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] - debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts + debug "Connecting to relay peer", + wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts var deadline = sleepAsync(dialTimeout) - var workfut = pm.switch.connect(peerId, peer.addrs) - var reasonFailed = "" + let workfut = pm.switch.connect(peerId, peer.addrs) + + # Can't use catch: with .withTimeout() in this case + let res = catch: await workfut or deadline - try: - await workfut or deadline - if workfut.finished(): + let reasonFailed = + if not workfut.finished(): + await workfut.cancelAndWait() + "timed out" + elif res.isErr(): res.error.msg + else: if not deadline.finished(): - deadline.cancel() + await deadline.cancelAndWait() + waku_peers_dials.inc(labelValues = ["successful"]) waku_node_conns_initiated.inc(labelValues = [source]) - pm.peerStore[NumberFailedConnBook][peerId] = 0 - return true - else: - reasonFailed = "timed out" - await cancelAndWait(workfut) - except CatchableError as exc: - reasonFailed = "remote peer failed" + pm.peerStore[NumberFailedConnBook][peerId] = 0 + + return true + # Dial failed pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1 pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) @@ -214,15 +221,15 @@ proc dialPeer(pm: PeerManager, # Dial Peer let dialFut = pm.switch.dial(peerId, addrs, proto) - var reasonFailed = "" - try: - if (await dialFut.withTimeout(dialTimeout)): + + let res = catch: + if await dialFut.withTimeout(dialTimeout): return some(dialFut.read()) - else: - reasonFailed = "timeout" - await cancelAndWait(dialFut) - except CatchableError as exc: - reasonFailed = "failed" + else: await cancelAndWait(dialFut) + + let reasonFailed = + if res.isOk: "timed out" + else: res.error.msg debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto @@ -293,105 +300,108 @@ proc canBeConnected*(pm: PeerManager, let now = Moment.init(getTime().toUnix, Second) let lastFailed = pm.peerStore[LastFailedConnBook][peerId] let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) - if now >= (lastFailed + backoff): - return true - return false + + return now >= (lastFailed + backoff) ################## # Initialisation # ################## proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = - if pm.switch.connManager.getConnections().hasKey(peerId): - let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) - if conns.len != 0: - let observedAddr = conns[0].connection.observedAddr - let ip = observedAddr.get.getHostname() - if observedAddr.isSome: - # TODO: think if circuit relay ips should be handled differently - let ip = observedAddr.get.getHostname() - return some(ip) - return none(string) + if not pm.switch.connManager.getConnections().hasKey(peerId): + return none(string) + + let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) + if conns.len == 0: + return none(string) + + let obAddr = conns[0].connection.observedAddr.valueOr: + return none(string) + + # TODO: think if circuit relay ips should be handled differently + + return some(obAddr.getHostname()) # called when a connection i) is created or ii) is closed proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = case event.kind - of ConnEventKind.Connected: - let direction = if event.incoming: Inbound else: Outbound - discard - of ConnEventKind.Disconnected: - discard + of ConnEventKind.Connected: + #let direction = if event.incoming: Inbound else: Outbound + discard + of ConnEventKind.Disconnected: + discard + +proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = + # To prevent metadata protocol from breaking prev nodes, by now we only + # disconnect if the clusterid is specified. + if pm.wakuMetadata.clusterId == 0: + return + + let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec) + + var reason: string + block guardClauses: + let conn = res.valueOr: + reason = "dial failed: " & error.msg + break guardClauses + + let metadata = (await pm.wakuMetadata.request(conn)).valueOr: + reason = "waku metatdata request failed: " & error + break guardClauses + + let clusterId = metadata.clusterId.valueOr: + reason = "empty cluster-id reported" + break guardClauses + + if pm.wakuMetadata.clusterId != clusterId: + reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId + break guardClauses + + if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)): + reason = "no shards in common" + break guardClauses + + return + + info "disconnecting from peer", peerId=peerId, reason=reason + asyncSpawn(pm.switch.disconnect(peerId)) + pm.peerStore.delete(peerId) # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = + if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: + await pm.onPeerMetadata(peerId) + var direction: PeerDirection var connectedness: Connectedness - if event.kind == PeerEventKind.Joined: - direction = if event.initiator: Outbound else: Inbound - connectedness = Connected + case event.kind: + of Joined: + direction = if event.initiator: Outbound else: Inbound + connectedness = Connected - var clusterOk = false - var reason = "" - # To prevent metadata protocol from breaking prev nodes, by now we only - # disconnect if the clusterid is specified. - if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0: - block wakuMetadata: - var conn: Connection - try: - conn = await pm.switch.dial(peerId, WakuMetadataCodec) - except CatchableError: - reason = "waku metadata codec not supported: " & getCurrentExceptionMsg() - break wakuMetadata + if (let ip = pm.getPeerIp(peerId); ip.isSome()): + pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - # request metadata from connecting peer - let metadata = (await pm.wakuMetadata.request(conn)).valueOr: - reason = "failed waku metadata codec request" - break wakuMetadata - - # does not report any clusterId - let clusterId = metadata.clusterId.valueOr: - reason = "empty clusterId reported" - break wakuMetadata - - # drop it if it doesnt match our network id - if pm.wakuMetadata.clusterId != clusterId: - reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId - break wakuMetadata - - # reaching here means the clusterId matches - clusterOk = true - - if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk: - info "disconnecting from peer", peerId=peerId, reason=reason - asyncSpawn(pm.switch.disconnect(peerId)) - pm.peerStore.delete(peerId) - - # TODO: Take action depending on the supported shards as reported by metadata - - let ip = pm.getPeerIp(peerId) - if ip.isSome: - pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - - let peersBehindIp = pm.ipTable[ip.get] - if peersBehindIp.len > pm.colocationLimit: # in theory this should always be one, but just in case - for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]: + let peersBehindIp = pm.ipTable[ip.get] + + let idx = max((peersBehindIp.len - pm.colocationLimit), 0) + for peerId in peersBehindIp[0.. pm.inRelayPeersTarget: - # await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget) + # Get all connected peers for Waku Relay + var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec) - if outRelayPeers.len >= pm.outRelayPeersTarget: + # Calculate in/out target number of peers for each shards + let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len + let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len + + for shard in pm.wakuMetadata.shards.items: + # Filter out peer not on this shard + let connectedInPeers = inPeers.filterIt( + pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) + + let connectedOutPeers = outPeers.filterIt( + pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) + + # Calculate the difference between current values and targets + let inPeerDiff = connectedInPeers.len - inTarget + let outPeerDiff = outTarget - connectedOutPeers.len + + if inPeerDiff > 0: + peersToDisconnect += inPeerDiff + + if outPeerDiff <= 0: + continue + + # Get all peers for this shard + var connectablePeers = pm.peerStore.getPeersByShard( + uint16(pm.wakuMetadata.clusterId), uint16(shard)) + + let shardCount = connectablePeers.len + + connectablePeers.keepItIf( + not pm.peerStore.isConnected(it.peerId) and + pm.canBeConnected(it.peerId)) + + let connectableCount = connectablePeers.len + + connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay)) + + let relayCount = connectablePeers.len + + debug "Sharded Peer Management", + shard = shard, + connectable = $connectableCount & "/" & $shardCount, + relayConnectable = $relayCount & "/" & $shardCount, + relayInboundTarget = $connectedInPeers.len & "/" & $inTarget, + relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget + + let length = min(outPeerDiff, connectablePeers.len) + for peer in connectablePeers[0..