diff --git a/.gitmodules b/.gitmodules index f3da114d5..cb629275a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -124,7 +124,7 @@ path = vendor/nim-websock url = https://github.com/status-im/nim-websock.git ignore = untracked - branch = master + branch = main [submodule "vendor/nim-zlib"] path = vendor/nim-zlib url = https://github.com/status-im/nim-zlib.git @@ -137,6 +137,6 @@ branch = main [submodule "vendor/dnsclient.nim"] path = vendor/dnsclient.nim - url = https://github.com/jm-clius/dnsclient.nim.git + url = https://github.com/ba0f3/dnsclient.nim.git ignore = untracked branch = master diff --git a/CHANGELOG.md b/CHANGELOG.md index cf3ecc000..a40ec1ce8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This release contains the following: - GossipSub [prune backoff period](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange) is now the recommended 1 minute - Bridge now uses content topic format according to [23/WAKU2-TOPICS](https://rfc.vac.dev/spec/23/) +- Better internal differentiation between local and remote peer info #### General refactoring diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 09b7a4596..9cde3a6a1 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -412,7 +412,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = # We have a viable storenode. Let's query it for historical messages. echo "Connecting to storenode: " & storenode.get() - node.wakuStore.setPeer(parsePeerInfo(storenode.get())) + node.wakuStore.setPeer(parseRemotePeerInfo(storenode.get())) proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: @@ -429,12 +429,12 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = if conf.lightpushnode != "": mountLightPush(node) - node.wakuLightPush.setPeer(parsePeerInfo(conf.lightpushnode)) + node.wakuLightPush.setPeer(parseRemotePeerInfo(conf.lightpushnode)) if conf.filternode != "": node.mountFilter() - node.wakuFilter.setPeer(parsePeerInfo(conf.filternode)) + node.wakuFilter.setPeer(parseRemotePeerInfo(conf.filternode)) proc filterHandler(msg: WakuMessage) {.gcsafe.} = trace "Hit filter handler", contentTopic=msg.contentTopic diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 1cf71ce32..2e2dce015 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -23,6 +23,7 @@ import ../../waku/v2/protocol/waku_store/[waku_store, waku_store_types], ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/utils/peers, ../test_helpers template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] @@ -139,8 +140,8 @@ procSuite "Waku v2 JSON-RPC API": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) # RPC server setup let @@ -229,7 +230,7 @@ procSuite "Waku v2 JSON-RPC API": var listenSwitch = newStandardSwitch(some(key)) discard waitFor listenSwitch.start() - node.wakuStore.setPeer(listenSwitch.peerInfo) + node.wakuStore.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(node.wakuRelay) listenSwitch.mount(node.wakuStore) @@ -530,9 +531,9 @@ procSuite "Waku v2 JSON-RPC API": storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() storePeer = PeerInfo.init(storeKey, @[locationAddr]) - node.wakuFilter.setPeer(filterPeer) - node.wakuSwap.setPeer(swapPeer) - node.wakuStore.setPeer(storePeer) + node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) + node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) + node.wakuStore.setPeer(storePeer.toRemotePeerInfo()) let response = await client.get_waku_v2_admin_v1_peers() @@ -572,8 +573,8 @@ procSuite "Waku v2 JSON-RPC API": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) # Setup two servers so we can see both sides of encrypted communication let @@ -662,8 +663,8 @@ procSuite "Waku v2 JSON-RPC API": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) # Setup two servers so we can see both sides of encrypted communication let diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 5eeb5c864..771178234 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -34,12 +34,12 @@ procSuite "Peer Manager": node2.mountRelay() # Dial node2 from node1 - let conn = (await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec)).get() + let conn = (await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)).get() # Check connection check: conn.activity - conn.peerInfo.peerId == peerInfo2.peerId + conn.peerId == peerInfo2.peerId # Check that node2 is being managed in node1 check: @@ -68,7 +68,7 @@ procSuite "Peer Manager": node2.mountRelay() # Dial node2 from node1 - let connOpt = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + let connOpt = await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) # Check connection failed gracefully check: @@ -100,9 +100,9 @@ procSuite "Peer Manager": node.mountSwap() node.mountStore(persistMessages = true) - node.wakuFilter.setPeer(filterPeer) - node.wakuSwap.setPeer(swapPeer) - node.wakuStore.setPeer(storePeer) + node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) + node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) + node.wakuStore.setPeer(storePeer.toRemotePeerInfo()) # Check peers were successfully added to peer manager check: @@ -136,21 +136,21 @@ procSuite "Peer Manager": node2.mountRelay() # Test default connectedness for new peers - node1.peerManager.addPeer(peerInfo2, WakuRelayCodec) + node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec) check: # No information about node2's connectedness node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected # Purposefully don't start node2 # Attempt dialing node2 from node1 - discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Cannot connect to node2 node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect # Successful connection await node2.start() - discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Currently connected to node2 node1.peerManager.connectedness(peerInfo2.peerId) == Connected @@ -181,7 +181,7 @@ procSuite "Peer Manager": node1.mountRelay() node2.mountRelay() - discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Currently connected to node2 node1.peerManager.peers().len == 1 @@ -233,7 +233,7 @@ asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers node2.mountRelay() node2.wakuRelay.codec = betaCodec - discard await node1.peerManager.dialPeer(peerInfo2, node2.wakuRelay.codec, 2.seconds) + discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds) check: # Currently connected to node2 node1.peerManager.peers().len == 1 diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index 77a1ce159..538a548d5 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -19,6 +19,7 @@ import ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/node/[wakunode2, waku_payload], + ../../waku/v2/utils/peers, ../test_helpers procSuite "WakuBridge": @@ -107,7 +108,7 @@ procSuite "WakuBridge": v2Node.mountRelay(@[DefaultBridgeTopic], triggerSelf = false) discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) - waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo]) + waitFor v2Node.connectToNodes(@[bridge.nodev2.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index ae7466d7e..04e22a067 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -41,7 +41,7 @@ procSuite "Waku Filter": rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = discard @@ -86,7 +86,7 @@ procSuite "Waku Filter": rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = discard diff --git a/tests/v2/test_waku_keepalive.nim b/tests/v2/test_waku_keepalive.nim index 961eb33b1..b89f9fd23 100644 --- a/tests/v2/test_waku_keepalive.nim +++ b/tests/v2/test_waku_keepalive.nim @@ -11,6 +11,7 @@ import libp2p/crypto/crypto, libp2p/multistream, ../../waku/v2/node/wakunode2, + ../../waku/v2/utils/peers, ../test_helpers, ./utils procSuite "Waku Keepalive": @@ -24,11 +25,11 @@ procSuite "Waku Keepalive": var completionFut = newFuture[bool]() - proc pingHandler(peer: PeerInfo) {.async, gcsafe, raises: [Defect].} = + proc pingHandler(peerId: PeerID) {.async, gcsafe, raises: [Defect].} = debug "Ping received" check: - peer.peerId == node1.switch.peerInfo.peerId + peerId == node1.switch.peerInfo.peerId completionFut.complete(true) @@ -40,7 +41,7 @@ procSuite "Waku Keepalive": node2.mountRelay() node2.switch.mount(Ping.new(handler = pingHandler)) - await node1.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) node1.startKeepalive() diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index e13f0c3b3..48c90a981 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -47,7 +47,7 @@ procSuite "Waku Light Push": rpc = PushRequest(pubSubTopic: defaultTopic, message: wm) dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) # TODO Can possibly get rid of this if it isn't dynamic diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 5fb8428ae..9f36fa371 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -35,7 +35,7 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -75,7 +75,7 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -121,7 +121,7 @@ procSuite "Waku Store": # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -165,7 +165,7 @@ procSuite "Waku Store": # this query targets: pubsubtopic1 rpc = HistoryQuery(pubsubTopic: pubsubTopic1) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -205,7 +205,7 @@ procSuite "Waku Store": # this query targets: pubsubtopic rpc = HistoryQuery(pubsubTopic: pubsubtopic) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -249,7 +249,7 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -277,7 +277,7 @@ procSuite "Waku Store": var listenSwitch2 = newStandardSwitch(some(key2)) discard await listenSwitch2.start() - proto2.setPeer(listenSwitch2.peerInfo) + proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo()) listenSwitch2.mount(proto2) @@ -319,7 +319,7 @@ procSuite "Waku Store": proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -366,7 +366,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -413,7 +413,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -551,7 +551,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) - proto.setPeer(listenSwitch.peerInfo) + proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) listenSwitch.mount(proto) @@ -630,7 +630,7 @@ procSuite "Waku Store": discard await dialSwitch2.start() let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) - proto2.setPeer(listenSwitch.peerInfo) + proto2.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) let successResult = await proto2.resume() check: @@ -648,7 +648,7 @@ procSuite "Waku Store": completionFut.complete(true) let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) - let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo) + let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo()) check: (await completionFut.withTimeout(5.seconds)) == true @@ -659,7 +659,7 @@ procSuite "Waku Store": let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) - let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo) + let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) check: messagesResult.isOk @@ -669,7 +669,7 @@ procSuite "Waku Store": var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo) - let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo) + let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) check: messagesResult.isOk @@ -684,7 +684,9 @@ procSuite "Waku Store": discard await dialSwitch3.start() let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng()) - let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo, listenSwitch.peerInfo, listenSwitch.peerInfo])) + let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(), + listenSwitch.peerInfo.toRemotePeerInfo(), + listenSwitch.peerInfo.toRemotePeerInfo()])) check: proto3.messages.len == 10 successResult.isOk diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index d8d9856c6..55284b54d 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -14,6 +14,7 @@ import ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/node/wakunode2, + ../../waku/v2/utils/peers, ../test_helpers, ./utils procSuite "Waku SWAP Accounting": @@ -71,9 +72,9 @@ procSuite "Waku SWAP Accounting": await sleepAsync(2000.millis) - node1.wakuStore.setPeer(node2.peerInfo) - node1.wakuSwap.setPeer(node2.peerInfo) - node2.wakuSwap.setPeer(node1.peerInfo) + node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo()) + node1.wakuSwap.setPeer(node2.peerInfo.toRemotePeerInfo()) + node2.wakuSwap.setPeer(node1.peerInfo.toRemotePeerInfo()) proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = debug "storeHandler hit" @@ -121,9 +122,9 @@ procSuite "Waku SWAP Accounting": await sleepAsync(2000.millis) - node1.wakuStore.setPeer(node2.peerInfo) - node1.wakuSwap.setPeer(node2.peerInfo) - node2.wakuSwap.setPeer(node1.peerInfo) + node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo()) + node1.wakuSwap.setPeer(node2.peerInfo.toRemotePeerInfo()) + node2.wakuSwap.setPeer(node1.peerInfo.toRemotePeerInfo()) proc handler1(response: HistoryResponse) {.gcsafe, closure.} = futures[0].complete(true) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 32584346d..b071c5aab 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -118,7 +118,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received - node1.wakuFilter.setPeer(node2.peerInfo) + node1.wakuFilter.setPeer(node2.peerInfo.toRemotePeerInfo()) await node1.subscribe(filterRequest, contentHandler) await sleepAsync(2000.millis) @@ -161,7 +161,7 @@ procSuite "WakuNode": await node2.start() node2.mountRelay() node2.mountFilter() - node2.wakuFilter.setPeer(node1.peerInfo) + node2.wakuFilter.setPeer(node1.peerInfo.toRemotePeerInfo()) var defaultComplete = newFuture[bool]() var otherComplete = newFuture[bool]() @@ -232,7 +232,7 @@ procSuite "WakuNode": await node2.start() node2.mountRelay(relayMessages=false) # Do not start WakuRelay or subscribe to any topics node2.mountFilter() - node2.wakuFilter.setPeer(node1.peerInfo) + node2.wakuFilter.setPeer(node1.peerInfo.toRemotePeerInfo()) check: node1.wakuRelay.isNil == false # Node1 is a full node @@ -283,7 +283,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) - node1.wakuStore.setPeer(node2.peerInfo) + node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo()) proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = check: @@ -315,7 +315,7 @@ procSuite "WakuNode": await node2.start() node2.mountFilter() - node1.wakuFilter.setPeer(node2.peerInfo) + node1.wakuFilter.setPeer(node2.peerInfo.toRemotePeerInfo()) proc handler(msg: WakuMessage) {.gcsafe, closure.} = check: @@ -360,8 +360,8 @@ procSuite "WakuNode": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -418,7 +418,7 @@ procSuite "WakuNode": # Now verify that protocol matcher returns `true` and relay works - await node1.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -449,37 +449,37 @@ procSuite "WakuNode": # First test the `happy path` expected case let addrStr = "/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - peerInfo = parsePeerInfo(addrStr) + remotePeerInfo = parseRemotePeerInfo(addrStr) check: - $(peerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - $(peerInfo.addrs[0][0].tryGet()) == "/ip4/127.0.0.1" - $(peerInfo.addrs[0][1].tryGet()) == "/tcp/60002" + $(remotePeerInfo.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" + $(remotePeerInfo.addrs[0][0].tryGet()) == "/ip4/127.0.0.1" + $(remotePeerInfo.addrs[0][1].tryGet()) == "/tcp/60002" # Now test some common corner cases expect LPError: # gibberish - discard parsePeerInfo("/p2p/$UCH GIBBER!SH") + discard parseRemotePeerInfo("/p2p/$UCH GIBBER!SH") expect LPError: # leading whitespace - discard parsePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") + discard parseRemotePeerInfo(" /ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") expect LPError: # trailing whitespace - discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ") + discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc ") expect LPError: # invalid IP address - discard parsePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") + discard parseRemotePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") expect ValueError: # no PeerID - discard parsePeerInfo("/ip4/127.0.0.1/tcp/60002") + discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002") expect ValueError: # unsupported transport - discard parsePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") + discard parseRemotePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") asyncTest "filtering relayed messages using topic validators": ## test scenario: @@ -519,8 +519,8 @@ procSuite "WakuNode": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) var completionFutValidatorAcc = newFuture[bool]() @@ -610,8 +610,8 @@ procSuite "WakuNode": await node3.start() node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.peerInfo]) - await node3.connectToNodes(@[node2.peerInfo]) + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -703,9 +703,9 @@ procSuite "WakuNode": await node3.start() node3.mountRelay(@[pubSubTopic]) - discard await node1.peerManager.dialPeer(node2.peerInfo, WakuLightPushCodec) + discard await node1.peerManager.dialPeer(node2.peerInfo.toRemotePeerInfo(), WakuLightPushCodec) await sleepAsync(5.seconds) - await node3.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) var completionFutLightPush = newFuture[bool]() var completionFutRelay = newFuture[bool]() @@ -764,7 +764,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) - node1.wakuStore.setPeer(node2.peerInfo) + node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo()) await node1.resume() @@ -805,7 +805,7 @@ procSuite "WakuNode": await sleepAsync(2000.millis) - node1.wakuStore.setPeer(node2.peerInfo) + node1.wakuStore.setPeer(node2.peerInfo.toRemotePeerInfo()) # populate db with msg1 to be a duplicate diff --git a/vendor/dnsclient.nim b/vendor/dnsclient.nim index c3ddd26a2..536cc6b79 160000 --- a/vendor/dnsclient.nim +++ b/vendor/dnsclient.nim @@ -1 +1 @@ -Subproject commit c3ddd26a2eece2a7bb558cb67d2f92846f9b8402 +Subproject commit 536cc6b7933e5f86590bb27083c0ffeab31255f9 diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 14ebf269e..59b91bf0c 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 14ebf269e9322de5a7f1fa455033b0bcf18144c6 +Subproject commit 59b91bf0ca2d1334bab1e0ed4e02de18fa62f360 diff --git a/vendor/nim-confutils b/vendor/nim-confutils index ab4ba1cbf..7176de4dd 160000 --- a/vendor/nim-confutils +++ b/vendor/nim-confutils @@ -1 +1 @@ -Subproject commit ab4ba1cbfdccdb8c0398894ffc25169bc61faeed +Subproject commit 7176de4ddb3a628a5c3abfcd430010bf0229deb1 diff --git a/vendor/nim-dnsdisc b/vendor/nim-dnsdisc index dcb9290d0..2d448241f 160000 --- a/vendor/nim-dnsdisc +++ b/vendor/nim-dnsdisc @@ -1 +1 @@ -Subproject commit dcb9290d004476fb0a5389baa88121b072abf135 +Subproject commit 2d448241fdb8f8e806089ef4dc978d0eff211117 diff --git a/vendor/nim-eth b/vendor/nim-eth index 20ad6504b..1babe3822 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 20ad6504b7e7869fbf2ce19a7e7a476a80f94cc4 +Subproject commit 1babe382265329a440b6b69a8b0f8b2c2b9a306f diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams index 5eb7fd0c9..3a0ab4257 160000 --- a/vendor/nim-faststreams +++ b/vendor/nim-faststreams @@ -1 +1 @@ -Subproject commit 5eb7fd0c90d3f03b6778688a5893fdd2715e9fe2 +Subproject commit 3a0ab42573e566ce52625760f6bbf7e0bbb6ebc4 diff --git a/vendor/nim-http-utils b/vendor/nim-http-utils index 9a56559ae..689da19e9 160000 --- a/vendor/nim-http-utils +++ b/vendor/nim-http-utils @@ -1 +1 @@ -Subproject commit 9a56559ae3ce7e81b75ae150c1030adf991bf39c +Subproject commit 689da19e9e9cfff4ced85e2b25c6b2b5598ed079 diff --git a/vendor/nim-json-rpc b/vendor/nim-json-rpc index 318949a40..b2417fc07 160000 --- a/vendor/nim-json-rpc +++ b/vendor/nim-json-rpc @@ -1 +1 @@ -Subproject commit 318949a4013504f4ec8931f14bc1b5d6e00dee78 +Subproject commit b2417fc0719a6d5069437a3097645d1fae6954d6 diff --git a/vendor/nim-json-serialization b/vendor/nim-json-serialization index 652099a95..4f3775ddf 160000 --- a/vendor/nim-json-serialization +++ b/vendor/nim-json-serialization @@ -1 +1 @@ -Subproject commit 652099a95960be7790e2f4b4c925d0dd703cc9aa +Subproject commit 4f3775ddf48d9abee30c51a53862cea84a09fa78 diff --git a/vendor/nim-libbacktrace b/vendor/nim-libbacktrace index b70db54e0..284b3aac0 160000 --- a/vendor/nim-libbacktrace +++ b/vendor/nim-libbacktrace @@ -1 +1 @@ -Subproject commit b70db54e073988f334904cddbfc840c9698ba74e +Subproject commit 284b3aac05a9d96c27044c389a5d27a84d8e8f4b diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index f274bfe19..75bfc1b5f 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit f274bfe19db5a39ffbca177b52db7e8a7eb44537 +Subproject commit 75bfc1b5f7679afc104bd1ceee1a0dc3bab7a316 diff --git a/vendor/nim-serialization b/vendor/nim-serialization index 5213d397f..fcd0eadad 160000 --- a/vendor/nim-serialization +++ b/vendor/nim-serialization @@ -1 +1 @@ -Subproject commit 5213d397f9d85c69279961256e19a859cd32df30 +Subproject commit fcd0eadadde0ee000a63df8ab21dc4e9f015a790 diff --git a/vendor/nim-stew b/vendor/nim-stew index 3c91b8694..478cc6efd 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit 3c91b8694e15137a81ec7db37c6c58194ec94a6a +Subproject commit 478cc6efdefaabadf0666a3351fb959b78009bcc diff --git a/vendor/nim-unittest2 b/vendor/nim-unittest2 index 91d4eaa4c..f1d70dbb8 160000 --- a/vendor/nim-unittest2 +++ b/vendor/nim-unittest2 @@ -1 +1 @@ -Subproject commit 91d4eaa4ccb4bfddf179fe2ee4247ae000e2587f +Subproject commit f1d70dbb8c7b5e2474b0bd5ac52f42c8c4318fd2 diff --git a/vendor/nim-web3 b/vendor/nim-web3 index 97e05aea6..9a23474af 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit 97e05aea6573d2630e318e7777a54d95db6ec40e +Subproject commit 9a23474afb7e2a14798ec0bf0e69e96cd5895e55 diff --git a/vendor/nim-websock b/vendor/nim-websock index d60df8176..1abf5f2f9 160000 --- a/vendor/nim-websock +++ b/vendor/nim-websock @@ -1 +1 @@ -Subproject commit d60df8176d187683cbfd0945b37fc8c885594ac9 +Subproject commit 1abf5f2f91ae3e8483c2acbb108dea521879c6e2 diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 77747657f..9d6b4b6e9 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 77747657f65a5fe26c281445b6ee9a1d6e72b1eb +Subproject commit 9d6b4b6e98515af8248127f51889c24308006096 diff --git a/vendor/nimcrypto b/vendor/nimcrypto index b602bd469..a5742a9a2 160000 --- a/vendor/nimcrypto +++ b/vendor/nimcrypto @@ -1 +1 @@ -Subproject commit b602bd469b66f6968f1d1b474f843a75d1ca6627 +Subproject commit a5742a9a214ac33f91615f3862c7b099aec43b00 diff --git a/waku/v2/node/dnsdisc/waku_dnsdisc.nim b/waku/v2/node/dnsdisc/waku_dnsdisc.nim index 8bfcbb866..67976edc3 100644 --- a/waku/v2/node/dnsdisc/waku_dnsdisc.nim +++ b/waku/v2/node/dnsdisc/waku_dnsdisc.nim @@ -15,9 +15,10 @@ import eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, libp2p/crypto/secp, - libp2p/peerinfo, libp2p/multiaddress, - discovery/dnsdisc/client + libp2p/peerid, + discovery/dnsdisc/client, + ../../utils/peers export client @@ -45,7 +46,7 @@ func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] = return none(IpTransportProtocol) -func toPeerInfo*(enr: enr.Record): Result[PeerInfo, cstring] = +func toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = let typedR = ? enr.toTypedRecord if not typedR.secp256k1.isSome: @@ -94,7 +95,7 @@ func toPeerInfo*(enr: enr.Record): Result[PeerInfo, cstring] = if addrs.len == 0: return err("enr: no addresses in record") - return ok(PeerInfo.init(peerId, addrs)) + return ok(RemotePeerInfo.init(peerId, addrs)) func createEnr*(privateKey: crypto.PrivateKey, enrIp: Option[ValidIpAddress], @@ -117,7 +118,7 @@ proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} = debug "Empty resolver called", domain=domain return "" -proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[PeerInfo], cstring] = +proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] = ## Find peers to connect to using DNS based discovery info "Finding peers using Waku DNS discovery" @@ -138,11 +139,11 @@ proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[PeerInfo], cstring] = else: trace "No ENR retrieved from client tree" - var discoveredNodes: seq[PeerInfo] + var discoveredNodes: seq[RemotePeerInfo] for enr in discoveredEnr: - # Convert discovered ENR to PeerInfo and add to discovered nodes - let res = enr.toPeerInfo() + # Convert discovered ENR to RemotePeerInfo and add to discovered nodes + let res = enr.toRemotePeerInfo() if res.isOk(): discoveredNodes.add(res.get()) diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index a8494448e..fb6e4a2b0 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -24,6 +24,10 @@ proc constructMultiaddrStr*(peerInfo: PeerInfo): string = # Constructs a multiaddress with both location (wire) address and p2p identity constructMultiaddrStr(peerInfo.addrs[0], peerInfo.peerId) +proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) + proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Admin API version 1 definitions diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 594284920..9bffd79fb 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -5,9 +5,10 @@ import chronos, chronicles, metrics, libp2p/multistream, ./waku_peer_store, - ../storage/peer/peer_storage + ../storage/peer/peer_storage, + ../../utils/peers -export waku_peer_store, peer_storage +export waku_peer_store, peer_storage, peers declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] @@ -28,10 +29,10 @@ let # Helper functions # #################### -proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo = - PeerInfo.init(peerId = storedInfo.peerId, - addrs = toSeq(storedInfo.addrs), - protocols = toSeq(storedInfo.protos)) +proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo = + RemotePeerInfo.init(peerId = storedInfo.peerId, + addrs = toSeq(storedInfo.addrs), + protocols = toSeq(storedInfo.protos)) proc insertOrReplace(ps: PeerStorage, peerId: PeerID, @@ -125,8 +126,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer debug "creating new PeerManager" - proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} = - onConnEvent(pm, peerInfo.peerId, event) + proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = + onConnEvent(pm, peerId, event) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) @@ -155,7 +156,7 @@ proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] = # Return the known info for all peers matching the provided protocolMatcher pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) -proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness = +proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness = # Return the connection state of the given, managed peer # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts @@ -168,10 +169,10 @@ proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness = else: pm.peerStore.connectionBook.get(peerId) -proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = +proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol - pm.peerStore.get(peerInfo.peerId).protos.contains(proto) + pm.peerStore.get(peerId).protos.contains(proto) proc hasPeers*(pm: PeerManager, proto: string): bool = # Returns `true` if manager has any peers for the specified protocol @@ -181,33 +182,33 @@ proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool = # Returns `true` if manager has any peers matching the protocolMatcher pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) -proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = +proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = # Adds peer to manager for the specified protocol - if peerInfo.peerId == pm.switch.peerInfo.peerId: + if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: # Do not attempt to manage our unmanageable self return - debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto + debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto # ...known addresses - for multiaddr in peerInfo.addrs: - pm.peerStore.addressBook.add(peerInfo.peerId, multiaddr) + for multiaddr in remotePeerInfo.addrs: + pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr) # ...public key var publicKey: PublicKey - discard peerInfo.peerId.extractPublicKey(publicKey) + discard remotePeerInfo.peerId.extractPublicKey(publicKey) - pm.peerStore.keyBook.set(peerInfo.peerId, publicKey) + pm.peerStore.keyBook.set(remotePeerInfo.peerId, publicKey) # ...associated protocols - pm.peerStore.protoBook.add(peerInfo.peerId, proto) + pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto) # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: - pm.storage.insertOrReplace(peerInfo.peerId, pm.peerStore.get(peerInfo.peerId), NotConnected) + pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) -proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = +proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = # Selects the best peer for a given protocol let peers = pm.peers.filterIt(it.protos.contains(proto)) @@ -215,9 +216,9 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = # @TODO proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned let peerStored = peers[0] - return some(peerStored.toPeerInfo()) + return some(peerStored.toRemotePeerInfo()) else: - return none(PeerInfo) + return none(RemotePeerInfo) proc reconnectPeers*(pm: PeerManager, proto: string, @@ -249,9 +250,9 @@ proc reconnectPeers*(pm: PeerManager, # Add to protos for peer, if it has not been added yet if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto): - let peerInfo = storedInfo.toPeerInfo() - trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto - pm.addPeer(peerInfo, proto) + let remotePeerInfo = storedInfo.toRemotePeerInfo() + trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto + pm.addPeer(remotePeerInfo, proto) trace "Reconnecting to peer", peerId=storedInfo.peerId discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) @@ -260,17 +261,31 @@ proc reconnectPeers*(pm: PeerManager, # Dialer interface # #################### -proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = +proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers # @TODO check peer validity and score before continuing. Limit number of peers to be managed. # First add dialed peer info to peer store, if it does not exist yet... - if not pm.hasPeer(peerInfo, proto): - trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto - pm.addPeer(peerInfo, proto) + if not pm.hasPeer(remotePeerInfo.peerId, proto): + trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto + pm.addPeer(remotePeerInfo, proto) - if peerInfo.peerId == pm.switch.peerInfo.peerId: + if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: # Do not attempt to dial self return none(Connection) - return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout) + return await pm.dialPeer(remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout) + +proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = + # Dial an existing peer by looking up it's existing addrs in the switch's peerStore + # @TODO check peer validity and score before continuing. Limit number of peers to be managed. + + if peerId == pm.switch.peerInfo.peerId: + # Do not attempt to dial self + return none(Connection) + + let addrs = toSeq(pm.switch.peerStore.addressBook.get(peerId)) + if addrs.len == 0: + return none(Connection) + + return await pm.dialPeer(peerId, addrs, proto, dialTimeout) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 1b992cc1d..a73de5ee4 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -28,6 +28,13 @@ type keyBook*: KeyBook connectionBook*: ConnectionBook disconnectBook*: DisconnectBook + + StoredInfo* = object + # Collates stored info about a peer + peerId*: PeerID + addrs*: HashSet[MultiAddress] + protos*: HashSet[string] + publicKey*: PublicKey proc new*(T: type WakuPeerStore): WakuPeerStore = var p: WakuPeerStore diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 3a765c437..84136868f 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -347,7 +347,7 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as # TODO wakuSwap now part of wakuStore object await node.wakuStore.queryWithAccounting(query, handler) -proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])) {.async, gcsafe.} = +proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) ## messages are stored in the the wakuStore's messages field and in the message db @@ -627,7 +627,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = # First get a list of connected peer infos let peers = node.peerManager.peers() .filterIt(node.peerManager.connectedness(it.peerId) == Connected) - .mapIt(it.toPeerInfo()) + .mapIt(it.toRemotePeerInfo()) # Attempt to retrieve and ping the active outgoing connection for each peer for peer in peers: @@ -654,7 +654,7 @@ proc startKeepalive*(node: WakuNode) = proc dialPeer*(n: WakuNode, address: string) {.async.} = info "dialPeer", address = address # XXX: This turns ipfs into p2p, not quite sure why - let remotePeer = parsePeerInfo(address) + let remotePeer = parseRemotePeerInfo(address) info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId # NOTE This is dialing on WakuRelay protocol specifically @@ -664,21 +664,21 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} = proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = info "Set store peer", address = address - let remotePeer = parsePeerInfo(address) + let remotePeer = parseRemotePeerInfo(address) n.wakuStore.setPeer(remotePeer) proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = info "Set filter peer", address = address - let remotePeer = parsePeerInfo(address) + let remotePeer = parseRemotePeerInfo(address) n.wakuFilter.setPeer(remotePeer) proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = info "Set lightpush peer", address = address - let remotePeer = parsePeerInfo(address) + let remotePeer = parseRemotePeerInfo(address) n.wakuLightPush.setPeer(remotePeer) @@ -696,10 +696,10 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} = # later. await sleepAsync(5.seconds) -proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} = - for peerInfo in nodes: - info "connectToNodes", peer = peerInfo - discard await n.peerManager.dialPeer(peerInfo, WakuRelayCodec) +proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo]) {.async.} = + for remotePeerInfo in nodes: + info "connectToNodes", peer = remotePeerInfo + discard await n.peerManager.dialPeer(remotePeerInfo, WakuRelayCodec) # The issue seems to be around peers not being fully connected when # trying to subscribe. So what we do is sleep to guarantee nodes are diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 8cfea3e0b..71c05b854 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -59,7 +59,7 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics for subscriber in subscribers.mitems: - if subscriber.peer.peerId != peerId: continue + if subscriber.peer != peerId: continue # make sure we delete the content filter # if no more topics are left @@ -179,9 +179,9 @@ method init*(wf: WakuFilter) = wf.pushHandler(value.requestId, value.push) if value.request != FilterRequest(): if value.request.subscribe: - wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) + wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request)) else: - wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId) + wf.subscribers.unsubscribeFilters(value.request, conn.peerId) waku_filter_subscribers.set(wf.subscribers.len.int64) @@ -197,7 +197,7 @@ proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgCont return wf -proc setPeer*(wf: WakuFilter, peer: PeerInfo) = +proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = wf.peerManager.addPeer(peer, WakuFilterCodec) waku_filter_peers.inc() diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index ef25649b8..29c4179ec 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -1,7 +1,6 @@ import std/[tables], bearssl, - libp2p/peerinfo, libp2p/protocols/protocol, ../../node/peer_manager/peer_manager, ../waku_message @@ -35,7 +34,7 @@ type push*: MessagePush Subscriber* = object - peer*: PeerInfo + peer*: PeerID requestId*: string filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim index 79970de1c..a221e22f4 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim @@ -117,7 +117,7 @@ proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgC return wl -proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) = +proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = wlp.peerManager.addPeer(peer, WakuLightPushCodec) waku_lightpush_peers.inc() diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim index bb3602e8e..1862b7fb6 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim @@ -1,7 +1,6 @@ import std/[tables], bearssl, - libp2p/peerinfo, libp2p/protocols/protocol, ../../node/peer_manager/peer_manager, ../waku_message, diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 0c573d7ab..7058f2538 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -429,9 +429,9 @@ proc init*(ws: WakuStore) = if not ws.wakuSwap.isNil: info "handle store swap test", text=ws.wakuSwap.text # NOTE Perform accounting operation - let peerInfo = conn.peerInfo + let peerId = conn.peerId let messages = response.messages - ws.wakuSwap.credit(peerInfo, messages.len) + ws.wakuSwap.credit(peerId, messages.len) else: info "handle store swap is nil" @@ -467,7 +467,7 @@ proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgConte return output # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY -proc setPeer*(ws: WakuStore, peer: PeerInfo) = +proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() @@ -527,7 +527,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) -proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async, gcsafe.} = +proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[QueryResult] {.async, gcsafe.} = ## sends the query to the given peer ## returns the number of retrieved messages if no error occurs, otherwise returns the error string # TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary @@ -556,7 +556,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe handler(response.value.response) return ok(response.value.response.messages.len.uint64) -proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Future[MessagesResult] {.async, gcsafe.} = +proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[MessagesResult] {.async, gcsafe.} = ## a thin wrapper for queryFrom ## sends the query to the given peer ## when the query has a valid pagingInfo, it retrieves the historical messages in pages @@ -588,7 +588,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Fu return ok(messageList) -proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[PeerInfo]): Future[MessagesResult] {.async, gcsafe.} = +proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} = ## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully ## returns the retrieved messages, or error if all the requests fail for peer in candidateList.items: @@ -612,7 +612,7 @@ proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool = if message in list: return true return false -proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} = +proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -689,8 +689,8 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] return err("no suitable remote peers") debug "a peer is selected from peer manager" - let peerInfo = peerOpt.get() - let successResult = await ws.queryFromWithPaging(rpc, peerInfo) + let remotePeerInfo = peerOpt.get() + let successResult = await ws.queryFromWithPaging(rpc, remotePeerInfo) if successResult.isErr: debug "failed to resume the history" return err("failed to resume the history") @@ -735,9 +735,9 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand # NOTE Perform accounting operation # Assumes wakuSwap protocol is mounted - let peerInfo = peerOpt.get() + let remotePeerInfo = peerOpt.get() let messages = response.value.response.messages - ws.wakuSwap.debit(peerInfo, messages.len) + ws.wakuSwap.debit(remotePeerInfo.peerId, messages.len) waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 0c3d43130..9731b9995 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -6,7 +6,6 @@ import # external imports bearssl, - libp2p/peerinfo, libp2p/protocols/protocol, stew/results, # internal imports diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index 99a08fb77..79a2367b4 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -114,8 +114,8 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = # TODO Assume we calculated cheque -proc sendCheque*(ws: WakuSwap, peerInfo : PeerInfo) {.async.} = - let connOpt = await ws.peerManager.dialPeer(peerInfo, WakuSwapCodec) +proc sendCheque*(ws: WakuSwap, peerId: PeerID) {.async.} = + let connOpt = await ws.peerManager.dialPeer(peerId, WakuSwapCodec) if connOpt.isNone(): # @TODO more sophisticated error handling here @@ -145,16 +145,13 @@ proc sendCheque*(ws: WakuSwap, peerInfo : PeerInfo) {.async.} = await connOpt.get().writeLP(Cheque(amount: 1, signature: sigBytes, issuerAddress: aliceWalletAddress).encode().buffer) # Set new balance - let peerId = peerInfo.peerId ws.accounting[peerId] -= 1 info "New accounting state", accounting = ws.accounting[peerId] # TODO Authenticate cheque, check beneficiary etc -proc handleCheque*(ws: WakuSwap, cheque: Cheque, peerInfo : PeerInfo) {.raises: [Defect, KeyError].} = +proc handleCheque*(ws: WakuSwap, cheque: Cheque, peerId: PeerID) {.raises: [Defect, KeyError].} = info "handle incoming cheque" - let peerId = peerInfo.peerId - # Get the original signer using web3. For now, a static value (0x6C3d502f1a97d4470b881015b83D9Dd1062172e1) will be used. # Check if web3.eth.personal.ecRecover(messageHash, signature); or an equivalent function has been implemented in nim-web3 let signer = "0x6C3d502f1a97d4470b881015b83D9Dd1062172e1" @@ -227,34 +224,34 @@ proc init*(wakuSwap: WakuSwap) = return info "received cheque", value=res.value - wakuSwap.handleCheque(res.value, conn.peerInfo) + wakuSwap.handleCheque(res.value, conn.peerId) - proc credit(peerInfo: PeerInfo, n: int) + proc credit(peerId: PeerID, n: int) {.gcsafe, closure, raises: [Defect, KeyError, Exception].} = - let peerId = peerInfo.peerId + info "Crediting peer: ", peer=peerId, amount=n if wakuSwap.accounting.hasKey(peerId): wakuSwap.accounting[peerId] -= n else: wakuSwap.accounting[peerId] = -n info "Accounting state", accounting = wakuSwap.accounting[peerId] - wakuSwap.applyPolicy(peerInfo) + wakuSwap.applyPolicy(peerId) # TODO Debit and credit here for Karma asset - proc debit(peerInfo: PeerInfo, n: int) + proc debit(peerId: PeerID, n: int) {.gcsafe, closure, raises: [Defect, KeyError, Exception].} = - let peerId = peerInfo.peerId + info "Debiting peer: ", peer=peerId, amount=n if wakuSwap.accounting.hasKey(peerId): wakuSwap.accounting[peerId] += n else: wakuSwap.accounting[peerId] = n info "Accounting state", accounting = wakuSwap.accounting[peerId] - wakuSwap.applyPolicy(peerInfo) + wakuSwap.applyPolicy(peerId) - proc applyPolicy(peerInfo: PeerInfo) + proc applyPolicy(peerId: PeerID) {.gcsafe, closure, raises: [Defect, KeyError, Exception].} = - let peerId = peerInfo.peerId + # TODO Separate out depending on if policy is soft (accounting only) mock (send cheque but don't cash/verify) hard (actually send funds over testnet) #Check if the Disconnect Threshold has been hit. Account Balance nears the disconnectThreshold after a Credit has been done @@ -268,7 +265,7 @@ proc init*(wakuSwap: WakuSwap) = warn "Payment threshhold has been reached: ", threshold=wakuSwap.config.paymentThreshold, balance=wakuSwap.accounting[peerId] #In soft phase we don't send cheques yet if wakuSwap.config.mode == Mock: - discard wakuSwap.sendCheque(peerInfo) + discard wakuSwap.sendCheque(peerId) else: info "Payment threshhold not hit" @@ -296,7 +293,7 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref BrHmacDrbgContex return ws -proc setPeer*(ws: WakuSwap, peer: PeerInfo) = +proc setPeer*(ws: WakuSwap, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuSwapCodec) waku_swap_peers_count.inc() diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 43fdab33b..8ebdf9825 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -4,7 +4,6 @@ import std/tables, bearssl, libp2p/protocols/protocol, - libp2p/peerinfo, ../../node/peer_manager/peer_manager type @@ -36,9 +35,9 @@ type amount*: uint32 signature*: seq[byte] - CreditHandler* = proc (peerInfo: PeerInfo, amount: int) {.gcsafe, closure.} - DebitHandler* = proc (peerInfo: PeerInfo, amount: int) {.gcsafe, closure.} - ApplyPolicyHandler* = proc(peerInfo: PeerInfo) {.gcsafe, closure.} + CreditHandler* = proc (peerId: PeerID, amount: int) {.gcsafe, closure.} + DebitHandler* = proc (peerId: PeerID, amount: int) {.gcsafe, closure.} + ApplyPolicyHandler* = proc(peerId: PeerID) {.gcsafe, closure.} WakuSwap* = ref object of LPProtocol peerManager*: PeerManager diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index a5c478bae..98898400b 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -3,8 +3,46 @@ # Collection of utilities related to Waku peers import std/strutils, - libp2p/multiaddress, - libp2p/peerinfo + stew/results, + libp2p/[errors, + multiaddress, + peerid, + peerinfo] + +type + RemotePeerInfo* = ref object of RootObj + peerId*: PeerID + addrs*: seq[MultiAddress] + protocols*: seq[string] + +func `$`*(remotePeerInfo: RemotePeerInfo): string = + $remotePeerInfo.peerId + +proc init*( + p: typedesc[RemotePeerInfo], + peerId: PeerID, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): RemotePeerInfo = + + let remotePeerInfo = RemotePeerInfo( + peerId: peerId, + addrs: addrs, + protocols: protocols) + + return remotePeerInfo + +proc init*(p: typedesc[RemotePeerInfo], + peerId: string, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): RemotePeerInfo + {.raises: [Defect, ResultError[cstring], LPError].} = + + let remotePeerInfo = RemotePeerInfo( + peerId: PeerID.init(peerId).tryGet(), + addrs: addrs, + protocols: protocols) + + return remotePeerInfo proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueError, LPError].}= # @TODO: Rather than raising exceptions, this should return a Result @@ -17,7 +55,7 @@ proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueE ## Parses a fully qualified peer multiaddr, in the ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo -proc parsePeerInfo*(address: string): PeerInfo {.raises: [Defect, ValueError, LPError].}= +proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}= let multiAddr = MultiAddress.initAddress(address) var @@ -40,4 +78,11 @@ proc parsePeerInfo*(address: string): PeerInfo {.raises: [Defect, ValueError, LP if (not wireAddr.isWire()): raise newException(ValueError, "Invalid node multi-address") - return PeerInfo.init(peerIdStr, [wireAddr]) \ No newline at end of file + return RemotePeerInfo.init(peerIdStr, @[wireAddr]) + +## Converts the local peerInfo to dialable RemotePeerInfo +## Useful for testing or internal connections +proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = + RemotePeerInfo.init(peerInfo.peerId, + peerInfo.addrs, + peerInfo.protocols)