From 5046a4b3da161b887f40d799b596d5a1b94879f1 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 18 Oct 2022 09:05:53 -0500 Subject: [PATCH] refactor(node): split wakunode into waku_node and wakunode2 --- apps/chat2/chat2.nim | 4 +- apps/chat2bridge/chat2bridge.nim | 2 +- apps/wakubridge/wakubridge.nim | 6 +- examples/v2/basic2.nim | 2 +- tests/v2/test_jsonrpc_waku.nim | 10 +- tests/v2/test_peer_exchange.nim | 2 +- tests/v2/test_peer_manager.nim | 8 +- tests/v2/test_rest_debug_api.nim | 2 +- tests/v2/test_rest_relay_api.nim | 2 +- tests/v2/test_waku_bridge.nim | 2 +- tests/v2/test_waku_discv5.nim | 2 +- tests/v2/test_waku_dnsdisc.nim | 2 +- tests/v2/test_waku_keepalive.nim | 2 +- tests/v2/test_waku_peer_exchange.nim | 2 +- tests/v2/test_waku_rln_relay.nim | 2 +- tests/v2/test_waku_rln_relay_onchain.nim | 2 +- tests/v2/test_waku_swap.nim | 2 +- tests/v2/test_wakunode.nim | 2 +- tests/v2/test_wakunode_filter.nim | 2 +- tests/v2/test_wakunode_lightpush.nim | 2 +- tests/v2/test_wakunode_relay.nim | 2 +- tests/v2/test_wakunode_rln_relay.nim | 2 +- tests/v2/test_wakunode_store.nim | 2 +- tools/scripts/rpc_info.nim | 2 +- tools/scripts/rpc_publish.nim | 2 +- tools/scripts/rpc_query.nim | 2 +- tools/scripts/rpc_subscribe.nim | 2 +- tools/scripts/rpc_subscribe_filter.nim | 2 +- tools/simulation/quicksim2.nim | 2 +- tools/wakucanary/wakucanary.nim | 2 +- waku/v2/node/jsonrpc/admin_api.nim | 3 +- waku/v2/node/jsonrpc/debug_api.nim | 2 +- waku/v2/node/jsonrpc/filter_api.nim | 2 +- waku/v2/node/jsonrpc/private_api.nim | 2 +- waku/v2/node/jsonrpc/relay_api.nim | 2 +- waku/v2/node/jsonrpc/store_api.nim | 2 +- waku/v2/node/rest/debug/api_types.nim | 2 +- waku/v2/node/rest/debug/debug_api.nim | 2 +- waku/v2/node/rest/relay/relay_api.nim | 2 +- .../storage/message/dual_message_store.nim | 2 +- waku/v2/node/waku_node.nim | 830 ++++++++++++++++++ waku/v2/node/wakunode2.nim | 787 +---------------- waku/v2/node/wakunode2_setup_metrics.nim | 2 +- waku/v2/node/wakunode2_setup_rest.nim | 2 +- waku/v2/node/wakunode2_setup_rpc.nim | 2 +- waku/v2/node/wakunode2_types.nim | 50 -- .../waku_rln_relay/waku_rln_relay_utils.nim | 5 +- waku/waku.nim | 2 +- 48 files changed, 895 insertions(+), 886 deletions(-) create mode 100644 waku/v2/node/waku_node.nim delete mode 100644 waku/v2/node/wakunode2_types.nim diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index a261d14a0..32d3cd1d9 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -26,7 +26,7 @@ import ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store, - ../../waku/v2/node/[wakunode2, waku_payload], + ../../waku/v2/node/[waku_node, waku_payload], ../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/[peers, time], @@ -67,7 +67,7 @@ type Chat = ref object type PrivateKey* = crypto.PrivateKey - Topic* = wakunode2.Topic + Topic* = waku_node.Topic ##################### ## chat2 protobufs ## diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index ee8bd6f4c..4c29a3ab7 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -12,7 +12,7 @@ import libp2p/crypto/crypto, libp2p/errors, ../../../waku/v2/protocol/waku_message, - ../../../waku/v2/node/wakunode2, + ../../../waku/v2/node/waku_node, # Chat 2 imports ../chat2/chat2, # Common cli config diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index 219a278e4..c91f28455 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -19,7 +19,7 @@ import ../../waku/v2/utils/namespacing, ../../waku/v2/utils/time, ../../waku/v2/protocol/waku_message, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager/peer_manager, # Common cli config ./config_bridge @@ -51,7 +51,7 @@ type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode - nodev2PubsubTopic: wakunode2.Topic # Pubsub topic to bridge to/from + nodev2PubsubTopic: waku_node.Topic # Pubsub topic to bridge to/from seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. rng: ref HmacDrbgContext v1Pool: seq[Node] # Pool of v1 nodes for possible connections @@ -228,7 +228,7 @@ proc new*(T: type WakuBridge, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nameResolver: NameResolver = nil, # Bridge configuration - nodev2PubsubTopic: wakunode2.Topic, + nodev2PubsubTopic: waku_node.Topic, v1Pool: seq[Node] = @[], targetV1Peers = 0): T {.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} = diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index bb04e2662..217d9191e 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -8,7 +8,7 @@ import libp2p/crypto/[crypto,secp], eth/keys, json_rpc/[rpcclient, rpcserver], - ../../waku/v2/node/[config, wakunode2], + ../../waku/v2/node/[config, waku_node], ../../waku/common/utils/nat, ../../waku/v2/protocol/waku_message diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 958dfc967..696af045d 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -15,7 +15,7 @@ import import ../../waku/v1/node/rpc/hexstrings, ../../waku/v2/node/storage/message/waku_store_queue, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/jsonrpc/[store_api, relay_api, debug_api, @@ -231,7 +231,7 @@ procSuite "Waku v2 JSON-RPC API": # WakuStore setup let - key = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + key = crypto.PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.new(key) await node.mountStore(store=StoreQueueRef.new()) @@ -536,13 +536,13 @@ procSuite "Waku v2 JSON-RPC API": let locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() - filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() filterPeer = PeerInfo.new(filterKey, @[locationAddr]) - swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get() swapPeer = PeerInfo.new(swapKey, @[locationAddr]) - storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() storePeer = PeerInfo.new(storeKey, @[locationAddr]) node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) diff --git a/tests/v2/test_peer_exchange.nim b/tests/v2/test_peer_exchange.nim index a2cecd9d3..358dbdbef 100644 --- a/tests/v2/test_peer_exchange.nim +++ b/tests/v2/test_peer_exchange.nim @@ -10,7 +10,7 @@ import libp2p/crypto/crypto, libp2p/protocols/pubsub/gossipsub import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/utils/peers, ../test_helpers diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index d38fd25db..31a08d313 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -20,7 +20,7 @@ import ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/storage/peer/waku_peer_storage, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../test_helpers procSuite "Peer Manager": @@ -89,15 +89,15 @@ procSuite "Peer Manager": Port(60000)) # Create filter peer filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() - filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() filterPeer = PeerInfo.new(filterKey, @[filterLoc]) # Create swap peer swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet() - swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get() swapPeer = PeerInfo.new(swapKey, @[swapLoc]) # Create store peer storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet() - storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() storePeer = PeerInfo.new(storeKey, @[storeLoc]) await node.start() diff --git a/tests/v2/test_rest_debug_api.nim b/tests/v2/test_rest_debug_api.nim index 3ebba83af..d4f9810d2 100644 --- a/tests/v2/test_rest_debug_api.nim +++ b/tests/v2/test_rest_debug_api.nim @@ -9,7 +9,7 @@ import libp2p/multiaddress, libp2p/crypto/crypto import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/rest/[server, client, utils], ../../waku/v2/node/rest/debug/debug_api diff --git a/tests/v2/test_rest_relay_api.nim b/tests/v2/test_rest_relay_api.nim index 44028a639..ff921b0d8 100644 --- a/tests/v2/test_rest_relay_api.nim +++ b/tests/v2/test_rest_relay_api.nim @@ -11,7 +11,7 @@ import libp2p/protocols/pubsub/pubsub import ../../waku/v2/protocol/waku_message, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/rest/[server, client, base64, utils], ../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache] diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index 3ab282bbd..3f8052b6b 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -16,7 +16,7 @@ import import ../../waku/v1/protocol/waku_protocol, ../../waku/v2/protocol/waku_message, - ../../waku/v2/node/[wakunode2, waku_payload], + ../../waku/v2/node/[waku_node, waku_payload], ../../waku/v2/utils/peers, ../../apps/wakubridge/wakubridge, ../test_helpers diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 83942c18e..2c138c0d6 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -12,7 +12,7 @@ import eth/p2p/discoveryv5/enr, ../../waku/v2/protocol/waku_message, ../../waku/v2/node/discv5/waku_discv5, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../test_helpers procSuite "Waku Discovery v5": diff --git a/tests/v2/test_waku_dnsdisc.nim b/tests/v2/test_waku_dnsdisc.nim index a3d5669ca..a3da9ea7c 100644 --- a/tests/v2/test_waku_dnsdisc.nim +++ b/tests/v2/test_waku_dnsdisc.nim @@ -13,7 +13,7 @@ import import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/dnsdisc/waku_dnsdisc, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../test_helpers procSuite "Waku DNS Discovery": diff --git a/tests/v2/test_waku_keepalive.nim b/tests/v2/test_waku_keepalive.nim index ee5b49f6f..99c2bb98e 100644 --- a/tests/v2/test_waku_keepalive.nim +++ b/tests/v2/test_waku_keepalive.nim @@ -10,7 +10,7 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/multistream, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/utils/peers, ../test_helpers, ./utils diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index a07438b5d..0594682a2 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -12,7 +12,7 @@ import eth/keys, eth/p2p/discoveryv5/enr import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/discv5/waku_discv5, ../../waku/v2/protocol/waku_peer_exchange, diff --git a/tests/v2/test_waku_rln_relay.nim b/tests/v2/test_waku_rln_relay.nim index 0b9a9dfef..80be99552 100644 --- a/tests/v2/test_waku_rln_relay.nim +++ b/tests/v2/test_waku_rln_relay.nim @@ -13,7 +13,7 @@ import waku_rln_relay_types, waku_rln_relay_constants, waku_rln_relay_metrics], - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../test_helpers const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" diff --git a/tests/v2/test_waku_rln_relay_onchain.nim b/tests/v2/test_waku_rln_relay_onchain.nim index ccb6f3d3a..f1243a3f6 100644 --- a/tests/v2/test_waku_rln_relay_onchain.nim +++ b/tests/v2/test_waku_rln_relay_onchain.nim @@ -12,7 +12,7 @@ import waku_rln_relay_constants, waku_rln_relay_types, rln_relay_contract], - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../test_helpers, ./test_utils diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 1bc8def80..022afcff2 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -18,7 +18,7 @@ import ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/node/storage/message/waku_store_queue, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/utils/peers, ../../waku/v2/utils/time, ../test_helpers, diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 569daeaca..ca71e7002 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -14,7 +14,7 @@ import ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node procSuite "WakuNode": diff --git a/tests/v2/test_wakunode_filter.nim b/tests/v2/test_wakunode_filter.nim index 5119cf428..cfee2ba46 100644 --- a/tests/v2/test_wakunode_filter.nim +++ b/tests/v2/test_wakunode_filter.nim @@ -19,7 +19,7 @@ import ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node procSuite "WakuNode - Filter": diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index 635ce8956..469fcc0bc 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -13,7 +13,7 @@ import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, ../../waku/v2/utils/time, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node from std/times import getTime, toUnixFloat diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/test_wakunode_relay.nim index 693b92ac7..cad16061f 100644 --- a/tests/v2/test_wakunode_relay.nim +++ b/tests/v2/test_wakunode_relay.nim @@ -19,7 +19,7 @@ import ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node template sourceDir: string = currentSourcePath.parentDir() diff --git a/tests/v2/test_wakunode_rln_relay.nim b/tests/v2/test_wakunode_rln_relay.nim index 751a10046..85ed5b003 100644 --- a/tests/v2/test_wakunode_rln_relay.nim +++ b/tests/v2/test_wakunode_rln_relay.nim @@ -20,7 +20,7 @@ import waku_rln_relay_constants], ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/utils/peers, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node from std/times import epochTime diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index d75cae3ba..342ab9911 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -23,7 +23,7 @@ import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, ../../waku/v2/utils/time, - ../../waku/v2/node/wakunode2 + ../../waku/v2/node/waku_node from std/times import getTime, toUnixFloat diff --git a/tools/scripts/rpc_info.nim b/tools/scripts/rpc_info.nim index f8ff46760..1ebf2c270 100644 --- a/tools/scripts/rpc_info.nim +++ b/tools/scripts/rpc_info.nim @@ -6,7 +6,7 @@ import system, options import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, diff --git a/tools/scripts/rpc_publish.nim b/tools/scripts/rpc_publish.nim index 32390e1b8..579cc4eb6 100644 --- a/tools/scripts/rpc_publish.nim +++ b/tools/scripts/rpc_publish.nim @@ -6,7 +6,7 @@ import system, options import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, diff --git a/tools/scripts/rpc_query.nim b/tools/scripts/rpc_query.nim index 106f57b2b..518418e04 100644 --- a/tools/scripts/rpc_query.nim +++ b/tools/scripts/rpc_query.nim @@ -6,7 +6,7 @@ import system, options import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, diff --git a/tools/scripts/rpc_subscribe.nim b/tools/scripts/rpc_subscribe.nim index 93abc0ff2..09d62cf65 100644 --- a/tools/scripts/rpc_subscribe.nim +++ b/tools/scripts/rpc_subscribe.nim @@ -5,7 +5,7 @@ import system, options import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, diff --git a/tools/scripts/rpc_subscribe_filter.nim b/tools/scripts/rpc_subscribe_filter.nim index 90c1b4360..e3bd8cd8e 100644 --- a/tools/scripts/rpc_subscribe_filter.nim +++ b/tools/scripts/rpc_subscribe_filter.nim @@ -6,7 +6,7 @@ import system, options import - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, diff --git a/tools/simulation/quicksim2.nim b/tools/simulation/quicksim2.nim index 1159e4ae5..8321111a2 100644 --- a/tools/simulation/quicksim2.nim +++ b/tools/simulation/quicksim2.nim @@ -10,7 +10,7 @@ import ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_message, ../../waku/v2/utils/time, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/[jsonrpc_types,jsonrpc_utils] diff --git a/tools/wakucanary/wakucanary.nim b/tools/wakucanary/wakucanary.nim index 5c0dd53a8..e6f254097 100644 --- a/tools/wakucanary/wakucanary.nim +++ b/tools/wakucanary/wakucanary.nim @@ -10,7 +10,7 @@ import import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/node/wakunode2, + ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_payload, ../../waku/v2/utils/peers diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index d4c99a0c4..89d1a8cdb 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -6,13 +6,12 @@ import json_rpc/rpcserver, libp2p/[peerinfo, switch] import - ../../protocol/waku_message, ../../protocol/waku_store, ../../protocol/waku_filter, ../../protocol/waku_relay, ../../protocol/waku_swap/waku_swap, ../peer_manager/peer_manager, - ../wakunode2, + ../waku_node, ./jsonrpc_types export jsonrpc_types diff --git a/waku/v2/node/jsonrpc/debug_api.nim b/waku/v2/node/jsonrpc/debug_api.nim index bb9071184..0ae3a3fd6 100644 --- a/waku/v2/node/jsonrpc/debug_api.nim +++ b/waku/v2/node/jsonrpc/debug_api.nim @@ -3,7 +3,7 @@ import chronicles, json_rpc/rpcserver, - ../wakunode2 + ../waku_node logScope: topics = "debug api" diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 526aea59a..6f33121e7 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -7,7 +7,7 @@ import import ../../protocol/waku_message, ../../protocol/waku_filter, - ../wakunode2, + ../waku_node, ./jsonrpc_types export jsonrpc_types diff --git a/waku/v2/node/jsonrpc/private_api.nim b/waku/v2/node/jsonrpc/private_api.nim index 987f31fdb..4ffac4ff0 100644 --- a/waku/v2/node/jsonrpc/private_api.nim +++ b/waku/v2/node/jsonrpc/private_api.nim @@ -6,7 +6,7 @@ import eth/keys, json_rpc/rpcserver, nimcrypto/sysrand, - ../wakunode2, + ../waku_node, ../waku_payload, ./jsonrpc_types, ./jsonrpc_utils diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 047e2dc83..c909c010c 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -6,7 +6,7 @@ import json_rpc/rpcserver, libp2p/protocols/pubsub/pubsub, ../../protocol/waku_message, - ../wakunode2, + ../waku_node, ./jsonrpc_types, ./jsonrpc_utils diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index f4ad24b0a..582c9531f 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -5,7 +5,7 @@ import chronicles, json_rpc/rpcserver import - ../wakunode2, + ../waku_node, ../../protocol/waku_store, ../../utils/time, ./jsonrpc_types, diff --git a/waku/v2/node/rest/debug/api_types.nim b/waku/v2/node/rest/debug/api_types.nim index 4dfad279b..1c56de304 100644 --- a/waku/v2/node/rest/debug/api_types.nim +++ b/waku/v2/node/rest/debug/api_types.nim @@ -5,7 +5,7 @@ import json_serialization, json_serialization/std/options import ".."/serdes -import ../../wakunode2 +import ../../waku_node #### Types diff --git a/waku/v2/node/rest/debug/debug_api.nim b/waku/v2/node/rest/debug/debug_api.nim index 379b93df5..3810feb43 100644 --- a/waku/v2/node/rest/debug/debug_api.nim +++ b/waku/v2/node/rest/debug/debug_api.nim @@ -8,7 +8,7 @@ import presto/[route, client] import "."/api_types import ".."/[serdes, utils] -import ../../wakunode2 +import ../../waku_node logScope: topics = "rest_api_debug" diff --git a/waku/v2/node/rest/relay/relay_api.nim b/waku/v2/node/rest/relay/relay_api.nim index 967b71434..3c81d862f 100644 --- a/waku/v2/node/rest/relay/relay_api.nim +++ b/waku/v2/node/rest/relay/relay_api.nim @@ -8,7 +8,7 @@ import json_serialization/std/options, presto/[route, client, common] import - ../../wakunode2, + ../../waku_node, ../serdes, ../utils, ./api_types, diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim index 79553483f..54edec79f 100644 --- a/waku/v2/node/storage/message/dual_message_store.nim +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -1,7 +1,7 @@ {.push raises: [Defect].} import - std/[options, times], + std/options, stew/results, chronicles import diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim new file mode 100644 index 000000000..1261292d0 --- /dev/null +++ b/waku/v2/node/waku_node.nim @@ -0,0 +1,830 @@ +{.push raises: [Defect].} + +import + std/[hashes, options, tables, strutils, sequtils, os], + chronos, chronicles, metrics, + stew/shims/net as stewNet, + stew/byteutils, + eth/keys, + nimcrypto, + bearssl/rand, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + libp2p/protocols/ping, + libp2p/protocols/pubsub/[gossipsub, rpc/messages], + libp2p/nameresolving/nameresolver, + libp2p/[builders, multihash], + libp2p/transports/[transport, tcptransport, wstransport] +import + ../protocol/[waku_relay, waku_message], + ../protocol/waku_store, + ../protocol/waku_swap/waku_swap, + ../protocol/waku_filter, + ../protocol/waku_lightpush, + ../protocol/waku_rln_relay/waku_rln_relay_types, + ../protocol/waku_peer_exchange, + ../utils/[peers, requests, wakuenr], + ./peer_manager/peer_manager, + ./storage/message/waku_store_queue, + ./storage/message/message_retention_policy, + ./storage/message/message_retention_policy_capacity, + ./storage/message/message_retention_policy_time, + ./dnsdisc/waku_dnsdisc, + ./discv5/waku_discv5, + ./wakuswitch + +declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"] +declarePublicCounter waku_node_messages, "number of messages received", ["type"] +declarePublicGauge waku_node_filters, "number of content filter subscriptions" +declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] + +logScope: + topics = "wakunode" + +# Git version in git describe format (defined compile time) +const git_version* {.strdefine.} = "n/a" + +# Default clientId +const clientId* = "Nimbus Waku v2 node" + +# Default topic +const defaultTopic* = "/waku/2/default-waku/proto" + +# Default Waku Filter Timeout +const WakuFilterTimeout: Duration = 1.days + + +# key and crypto modules different +type + # XXX: Weird type, should probably be using pubsub Topic object name? + Topic* = string + Message* = seq[byte] + + WakuInfo* = object + # NOTE One for simplicity, can extend later as needed + listenAddresses*: seq[string] + enrUri*: string + #multiaddrStrings*: seq[string] + + # NOTE based on Eth2Node in NBC eth2_network.nim + WakuNode* = ref object of RootObj + peerManager*: PeerManager + switch*: Switch + wakuRelay*: WakuRelay + wakuStore*: WakuStore + wakuFilter*: WakuFilter + wakuSwap*: WakuSwap + wakuRlnRelay*: WakuRLNRelay + wakuLightPush*: WakuLightPush + wakuPeerExchange*: WakuPeerExchange + enr*: enr.Record + libp2pPing*: Ping + filters*: Filters + rng*: ref rand.HmacDrbgContext + wakuDiscv5*: WakuDiscoveryV5 + announcedAddresses* : seq[MultiAddress] + started*: bool # Indicates that node has started listening + + +proc protocolMatcher(codec: string): Matcher = + ## Returns a protocol matcher function for the provided codec + proc match(proto: string): bool {.gcsafe.} = + ## Matches a proto with any postfix to the provided codec. + ## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos: + ## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense` + return proto.startsWith(codec) + + return match + +proc updateSwitchPeerInfo(node: WakuNode) = + ## TODO: remove this when supported upstream + ## + ## nim-libp2p does not yet support announcing addrs + ## different from bound addrs. + ## + ## This is a temporary workaround to replace + ## peer info addrs in switch to announced + ## addresses. + ## + ## WARNING: this should only be called once the switch + ## has already been started. + + if node.announcedAddresses.len > 0: + node.switch.peerInfo.addrs = node.announcedAddresses + +template ip4TcpEndPoint(address, port): MultiAddress = + MultiAddress.init(address, tcpProtocol, port) + +template dns4Ma(dns4DomainName: string): MultiAddress = + MultiAddress.init("/dns4/" & dns4DomainName).tryGet() + +template tcpPortMa(port: Port): MultiAddress = + MultiAddress.init("/tcp/" & $port).tryGet() + +template dns4TcpEndPoint(dns4DomainName: string, port: Port): MultiAddress = + dns4Ma(dns4DomainName) & tcpPortMa(port) + +template wsFlag(wssEnabled: bool): MultiAddress = + if wssEnabled: MultiAddress.init("/wss").tryGet() + else: MultiAddress.init("/ws").tryGet() + +proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, + bindIp: ValidIpAddress, bindPort: Port, + extIp = none(ValidIpAddress), extPort = none(Port), + peerStorage: PeerStorage = nil, + maxConnections = builders.MaxConnections, + wsBindPort: Port = (Port)8000, + wsEnabled: bool = false, + wssEnabled: bool = false, + secureKey: string = "", + secureCert: string = "", + wakuFlags = none(WakuEnrBitfield), + nameResolver: NameResolver = nil, + sendSignedPeerRecord = false, + dns4DomainName = none(string), + discv5UdpPort = none(Port) + ): T + {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = + ## Creates a Waku Node. + ## + ## Status: Implemented. + ## + + ## Initialize addresses + let + # Bind addresses + hostAddress = ip4TcpEndPoint(bindIp, bindPort) + wsHostAddress = if wsEnabled or wssEnabled: some(ip4TcpEndPoint(bindIp, wsbindPort) & wsFlag(wssEnabled)) + else: none(MultiAddress) + + # Setup external addresses, if available + var + hostExtAddress, wsExtAddress = none(MultiAddress) + + if (dns4DomainName.isSome()): + # Use dns4 for externally announced addresses + hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get())) + + if (wsHostAddress.isSome()): + wsExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), wsBindPort) & wsFlag(wssEnabled)) + else: + # No public domain name, use ext IP if available + if extIp.isSome() and extPort.isSome(): + hostExtAddress = some(ip4TcpEndPoint(extIp.get(), extPort.get())) + + if (wsHostAddress.isSome()): + wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled)) + + var announcedAddresses: seq[MultiAddress] + if hostExtAddress.isSome: + announcedAddresses.add(hostExtAddress.get()) + else: + announcedAddresses.add(hostAddress) # We always have at least a bind address for the host + + if wsExtAddress.isSome: + announcedAddresses.add(wsExtAddress.get()) + elif wsHostAddress.isSome: + announcedAddresses.add(wsHostAddress.get()) + + ## Initialize peer + let + rng = crypto.newRng() + enrIp = if extIp.isSome(): extIp + else: some(bindIp) + enrTcpPort = if extPort.isSome(): extPort + else: some(bindPort) + enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field + elif wsHostAddress.isSome: @[wsHostAddress.get()] + else: @[] + enr = initEnr(nodeKey, + enrIp, + enrTcpPort, + discv5UdpPort, + wakuFlags, + enrMultiaddrs) + + info "Initializing networking", addrs=announcedAddresses + + var switch = newWakuSwitch(some(nodekey), + hostAddress, + wsHostAddress, + transportFlags = {ServerFlags.ReuseAddr}, + rng = rng, + maxConnections = maxConnections, + wssEnabled = wssEnabled, + secureKeyPath = secureKey, + secureCertPath = secureCert, + nameResolver = nameResolver, + sendSignedPeerRecord = sendSignedPeerRecord) + + let wakuNode = WakuNode( + peerManager: PeerManager.new(switch, peerStorage), + switch: switch, + rng: rng, + enr: enr, + filters: Filters.init(), + announcedAddresses: announcedAddresses + ) + + return wakuNode + +proc peerInfo*(node: WakuNode): PeerInfo = + node.switch.peerInfo + +proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = + if node.wakuRelay.isNil: + error "Invalid API call to `subscribe`. WakuRelay not mounted." + # @TODO improved error handling + return + + info "subscribe", topic=topic + + proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # A default handler should be registered for all topics + trace "Hit default handler", topic=topic, data=data + + let msg = WakuMessage.init(data) + if msg.isErr(): + # TODO: Add metric to track waku message decode errors + return + + + # Notify mounted protocols of new message + if not node.wakuFilter.isNil(): + await node.wakuFilter.handleMessage(topic, msg.value) + + if not node.wakuStore.isNil(): + node.wakuStore.handleMessage(topic, msg.value) + + waku_node_messages.inc(labelValues = ["relay"]) + + + let wakuRelay = node.wakuRelay + + if topic notin PubSub(wakuRelay).topics: + # Add default handler only for new topics + debug "Registering default handler", topic=topic + wakuRelay.subscribe(topic, defaultHandler) + + if handler.isSome: + debug "Registering handler", topic=topic + wakuRelay.subscribe(topic, handler.get()) + +proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = + ## Subscribes to a PubSub topic. Triggers handler when receiving messages on + ## this topic. TopicHandler is a method that takes a topic and some data. + ## + ## NOTE The data field SHOULD be decoded as a WakuMessage. + ## Status: Implemented. + node.subscribe(topic, some(handler)) + +proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + ## FilterHandler is a method that takes a MessagePush. + ## + ## Status: Implemented. + + # Sanity check for well-formed subscribe FilterRequest + doAssert(request.subscribe, "invalid subscribe request") + + info "subscribe content", filter=request + + var id = generateRequestId(node.rng) + + if node.wakuFilter.isNil == false: + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) + + if resSubscription.isOk(): + id = resSubscription.get() + else: + # Failed to subscribe + error "remote subscription to filter failed", filter = request + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + # Register handler for filter, whether remote subscription succeeded or not + node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) + waku_node_filters.set(node.filters.len.int64) + +proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = + ## Unsubscribes a handler from a PubSub topic. + ## + ## Status: Implemented. + if node.wakuRelay.isNil: + error "Invalid API call to `unsubscribe`. WakuRelay not mounted." + # @TODO improved error handling + return + + info "unsubscribe", topic=topic + + let wakuRelay = node.wakuRelay + wakuRelay.unsubscribe(@[(topic, handler)]) + +proc unsubscribeAll*(node: WakuNode, topic: Topic) = + ## Unsubscribes all handlers registered on a specific PubSub topic. + ## + ## Status: Implemented. + + if node.wakuRelay.isNil: + error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." + # @TODO improved error handling + return + + info "unsubscribeAll", topic=topic + + let wakuRelay = node.wakuRelay + wakuRelay.unsubscribeAll(topic) + + +proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = + ## Unsubscribe from a content filter. + ## + ## Status: Implemented. + + # Sanity check for well-formed unsubscribe FilterRequest + doAssert(request.subscribe == false, "invalid unsubscribe request") + + info "unsubscribe content", filter=request + + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) + node.filters.removeContentFilters(request.contentFilters) + + waku_node_filters.set(node.filters.len.int64) + + +proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = + ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a + ## `contentTopic` field for light node functionality. This field may be also + ## be omitted. + ## + ## Status: Implemented. + + if node.wakuRelay.isNil: + error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." + # @TODO improved error handling + return + + let wakuRelay = node.wakuRelay + trace "publish", topic=topic, contentTopic=message.contentTopic + var publishingMessage = message + + let data = message.encode().buffer + + discard await wakuRelay.publish(topic, data) + +proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = + ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. + ## Returns whether relaying was successful or not. + ## `WakuMessage` should contain a `contentTopic` field for light node + ## functionality. + debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic + + let rpc = PushRequest(pubSubTopic: topic, message: message) + return await node.wakuLightPush.request(rpc) + +proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = + discard await node.lightpush(topic, message) + +proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + ## Queries known nodes for historical messages + + # TODO: Once waku swap is less experimental, this can simplified + if node.wakuSwap.isNil: + debug "Using default query" + return await node.wakuStore.query(query) + else: + debug "Using SWAP accounting query" + # TODO: wakuSwap now part of wakuStore object + return await node.wakuStore.queryWithAccounting(query) + +proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online + ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) + ## messages are stored in the the wakuStore's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + if node.wakuStore.isNil(): + return + + let retrievedMessages = await node.wakuStore.resume(peerList) + if retrievedMessages.isErr(): + error "failed to resume store", error=retrievedMessages.error + return + + info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value + +# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc +proc info*(node: WakuNode): WakuInfo = + ## Returns information about the Node, such as what multiaddress it can be reached at. + ## + ## Status: Implemented. + ## + + let peerInfo = node.switch.peerInfo + + var listenStr : seq[string] + for address in node.announcedAddresses: + var fulladdr = $address & "/p2p/" & $peerInfo.peerId + listenStr &= fulladdr + let enrUri = node.enr.toUri() + let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri) + return wakuInfo + +proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = + info "mounting filter" + proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} = + + info "push received" + for message in msg.messages: + node.filters.notify(message, requestId) # Trigger filter handlers on a light node + + if not node.wakuStore.isNil and (requestId in node.filters): + let pubSubTopic = node.filters[requestId].pubSubTopic + node.wakuStore.handleMessage(pubSubTopic, message) + + waku_node_messages.inc(labelValues = ["filter"]) + + node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout) + if node.started: + # Node has started already. Let's start filter too. + await node.wakuFilter.start() + + node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) + + +# NOTE: If using the swap protocol, it must be mounted before store. This is +# because store is using a reference to the swap protocol. +proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.async, raises: [Defect, LPError].} = + info "mounting swap", mode = $swapConfig.mode + + node.wakuSwap = WakuSwap.init(node.peerManager, node.rng, swapConfig) + if node.started: + # Node has started already. Let's start swap too. + await node.wakuSwap.start() + + node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) + + +const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes + +proc executeMessageRetentionPolicy*(node: WakuNode) = + if node.wakuStore.isNil(): + return + + if node.wakuStore.store.isNil(): + return + + debug "executing message retention policy" + + node.wakuStore.executeMessageRetentionPolicy() + node.wakuStore.reportStoredMessagesMetric() + +proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) = + if node.wakuStore.isNil(): + return + + if node.wakuStore.store.isNil(): + return + + # https://github.com/nim-lang/Nim/issues/17369 + var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} + executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = + executeMessageRetentionPolicy(node) + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + +proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = + if node.wakuSwap.isNil(): + info "mounting waku store protocol (no waku swap)" + else: + info "mounting waku store protocol with waku swap support" + + node.wakuStore = WakuStore.init( + node.peerManager, + node.rng, + store, + wakuSwap=node.wakuSwap, + retentionPolicy=retentionPolicy + ) + + if node.started: + # Node has started already. Let's start store too. + await node.wakuStore.start() + + node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) + + +proc startRelay*(node: WakuNode) {.async.} = + if node.wakuRelay.isNil: + trace "Failed to start relay. Not mounted." + return + + ## Setup and start relay protocol + info "starting relay" + + # Topic subscriptions + for topic in node.wakuRelay.defaultTopics: + node.subscribe(topic, none(TopicHandler)) + + # Resume previous relay connections + if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): + info "Found previous WakuRelay peers. Reconnecting." + + # Reconnect to previous relay peers. This will respect a backoff period, if necessary + let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) + + await node.peerManager.reconnectPeers(WakuRelayCodec, + protocolMatcher(WakuRelayCodec), + backoffPeriod) + + # Start the WakuRelay protocol + await node.wakuRelay.start() + + info "relay started successfully" + +proc mountRelay*(node: WakuNode, + topics: seq[string] = newSeq[string](), + triggerSelf = true, + peerExchangeHandler = none(RoutingRecordsHandler)) + # @TODO: Better error handling: CatchableError is raised by `waitFor` + {.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = + + proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] = + let mh = MultiHash.digest("sha2-256", m.data) + if mh.isOk(): + return ok(mh[].data.buffer) + else: + return ok(($m.data.hash).toBytes()) + + let wakuRelay = WakuRelay.init( + switch = node.switch, + msgIdProvider = msgIdProvider, + triggerSelf = triggerSelf, + sign = false, + verifySignature = false, + maxMessageSize = MaxWakuMessageSize + ) + + info "mounting relay" + + ## The default relay topics is the union of + ## all configured topics plus the hard-coded defaultTopic(s) + wakuRelay.defaultTopics = concat(@[defaultTopic], topics) + + ## Add peer exchange handler + if peerExchangeHandler.isSome(): + wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p + wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) + + node.wakuRelay = wakuRelay + if node.started: + # Node has started already. Let's start relay too. + await node.startRelay() + + node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) + + info "relay mounted successfully" + +proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = + info "mounting light push" + + if node.wakuRelay.isNil: + debug "mounting lightpush without relay" + node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil) + else: + debug "mounting lightpush with relay" + node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay) + + if node.started: + # Node has started already. Let's start lightpush too. + await node.wakuLightPush.start() + + node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) + +proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = + info "mounting waku peer exchange" + + var discv5Opt: Option[WakuDiscoveryV5] + if not node.wakuDiscV5.isNil(): + discv5Opt = some(node.wakuDiscV5) + node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt) + + if node.started: + # Node has started already. Let's start Waku peer exchange too. + await node.wakuPeerExchange.start() + + node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) + +proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = + info "mounting libp2p ping protocol" + + try: + node.libp2pPing = Ping.new(rng = node.rng) + except Exception as e: + # This is necessary as `Ping.new*` does not have explicit `raises` requirement + # @TODO: remove exception handling once explicit `raises` in ping module + raise newException(LPError, "Failed to initialize ping protocol") + + if node.started: + # Node has started already. Let's start ping too. + await node.libp2pPing.start() + + node.switch.mount(node.libp2pPing) + +proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = + while node.started: + # Keep all connected peers alive while running + trace "Running keepalive" + + # First get a list of connected peer infos + let peers = node.peerManager.peers() + .filterIt(node.peerManager.connectedness(it.peerId) == Connected) + .mapIt(it.toRemotePeerInfo()) + + # Attempt to retrieve and ping the active outgoing connection for each peer + for peer in peers: + let connOpt = await node.peerManager.dialPeer(peer, PingCodec) + + if connOpt.isNone: + # @TODO more sophisticated error handling here + debug "failed to connect to remote peer", peer=peer + waku_node_errors.inc(labelValues = ["keep_alive_failure"]) + return + + discard await node.libp2pPing.ping(connOpt.get()) # Ping connection + + await sleepAsync(keepalive) + +proc startKeepalive*(node: WakuNode) = + let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration + + info "starting keepalive", keepalive=defaultKeepalive + + asyncSpawn node.keepaliveLoop(defaultKeepalive) + +proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) = + n.wakuStore.setPeer(peer) + +proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = + info "Set store peer", address = address + + let peer = parseRemotePeerInfo(address) + n.setStorePeer(peer) + +proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) = + n.wakuFilter.setPeer(peer) + +proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = + info "Set filter peer", address = address + + let peer = parseRemotePeerInfo(address) + n.setFilterPeer(peer) + +proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) = + n.wakuLightPush.setPeer(peer) + +proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = + info "Set lightpush peer", address = address + + let peer = parseRemotePeerInfo(address) + n.wakuLightPush.setPeer(peer) + +proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = + info "Set peer exchange peer", address = address + + let remotePeer = parseRemotePeerInfo(address) + + n.wakuPeerExchange.setPeer(remotePeer) + +proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = + ## `source` indicates source of node addrs (static config, api call, discovery, etc) + # NOTE This is dialing on WakuRelay protocol specifically + await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source) + + +proc runDiscv5Loop(node: WakuNode) {.async.} = + ## Continuously add newly discovered nodes + ## using Node Discovery v5 + if (node.wakuDiscv5.isNil): + warn "Trying to run discovery v5 while it's disabled" + return + + info "Starting discovery loop" + + while node.wakuDiscv5.listening: + trace "Running discovery loop" + ## Query for a random target and collect all discovered nodes + ## @TODO: we could filter nodes here + let discoveredPeers = await node.wakuDiscv5.findRandomPeers() + if discoveredPeers.isOk: + ## Let's attempt to connect to peers we + ## have not encountered before + + trace "Discovered peers", count=discoveredPeers.get().len() + + let newPeers = discoveredPeers.get().filterIt( + not node.switch.isConnected(it.peerId)) + + if newPeers.len > 0: + debug "Connecting to newly discovered peers", count=newPeers.len() + await connectToNodes(node, newPeers, "discv5") + + # Discovery `queryRandom` can have a synchronous fast path for example + # when no peers are in the routing table. Don't run it in continuous loop. + # + # Also, give some time to dial the discovered nodes and update stats etc + await sleepAsync(5.seconds) + +proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = + ## Start Discovery v5 service + + info "Starting discovery v5 service" + + if not node.wakuDiscv5.isNil: + ## First start listening on configured port + try: + trace "Start listening on discv5 port" + node.wakuDiscv5.open() + except CatchableError: + error "Failed to start discovery service. UDP port may be already in use" + return false + + ## Start Discovery v5 + trace "Start discv5 service" + node.wakuDiscv5.start() + trace "Start discovering new peers using discv5" + + asyncSpawn node.runDiscv5Loop() + + debug "Successfully started discovery v5 service" + info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() + return true + + return false + +proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = + ## Stop Discovery v5 service + + if not node.wakuDiscv5.isNil: + info "Stopping discovery v5 service" + + ## Stop Discovery v5 process and close listening port + if node.wakuDiscv5.listening: + trace "Stop listening on discv5 port" + await node.wakuDiscv5.closeWait() + + debug "Successfully stopped discovery v5 service" + +proc start*(node: WakuNode) {.async.} = + ## Starts a created Waku Node and + ## all its mounted protocols. + ## + ## Status: Implemented. + + waku_version.set(1, labelValues=[git_version]) + + ## NB: careful when moving this. We need to start the switch with the bind address + ## BEFORE updating with announced addresses for the sake of identify. + await node.switch.start() + + let peerInfo = node.switch.peerInfo + info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs + var listenStr = "" + for address in node.announcedAddresses: + var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" + listenStr &= fulladdr + + ## XXX: this should be /ip4..., / stripped? + info "Listening on", full = listenStr + info "DNS: discoverable ENR ", enr = node.enr.toUri() + + # Perform relay-specific startup tasks TODO: this should be rethought + if not node.wakuRelay.isNil: + await node.startRelay() + + ## Update switch peer info with announced addrs + node.updateSwitchPeerInfo() + + node.started = true + + info "Node started successfully" + +proc stop*(node: WakuNode) {.async.} = + if not node.wakuRelay.isNil: + await node.wakuRelay.stop() + + if not node.wakuDiscv5.isNil: + discard await node.stopDiscv5() + + await node.switch.stop() + + node.started = false \ No newline at end of file diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index f2073baa2..e495850d1 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -1,806 +1,31 @@ {.push raises: [Defect].} import - std/[hashes, options, tables, strutils, sequtils, os], + std/[options, tables, strutils, sequtils, os], chronos, chronicles, metrics, stew/shims/net as stewNet, - stew/byteutils, eth/keys, - nimcrypto, eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, libp2p/protocols/ping, libp2p/protocols/pubsub/[gossipsub, rpc/messages], - libp2p/nameresolving/nameresolver, libp2p/[builders, multihash], - libp2p/transports/[transport, tcptransport, wstransport] + libp2p/transports/[transport, wstransport] import - ../protocol/[waku_relay, waku_message], ../protocol/waku_store, - ../protocol/waku_swap/waku_swap, ../protocol/waku_filter, - ../protocol/waku_lightpush, ../protocol/waku_rln_relay/waku_rln_relay_types, ../protocol/waku_peer_exchange, - ../utils/[peers, requests, wakuenr], + ../utils/[peers, wakuenr], ./peer_manager/peer_manager, ./storage/message/waku_store_queue, - ./storage/message/message_retention_policy, ./storage/message/message_retention_policy_capacity, ./storage/message/message_retention_policy_time, ./dnsdisc/waku_dnsdisc, ./discv5/waku_discv5, ./wakuswitch, - ./wakunode2_types + ./waku_node -export - wakunode2_types - -when defined(rln) or defined(rlnzerokit): - import ../protocol/waku_rln_relay/waku_rln_relay_utils - -declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"] -declarePublicCounter waku_node_messages, "number of messages received", ["type"] -declarePublicGauge waku_node_filters, "number of content filter subscriptions" -declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] - -logScope: - topics = "wakunode" - -# Git version in git describe format (defined compile time) -const git_version* {.strdefine.} = "n/a" - -# Default clientId -const clientId* = "Nimbus Waku v2 node" - -# Default topic -const defaultTopic* = "/waku/2/default-waku/proto" - -# Default Waku Filter Timeout -const WakuFilterTimeout: Duration = 1.days - -proc protocolMatcher(codec: string): Matcher = - ## Returns a protocol matcher function for the provided codec - proc match(proto: string): bool {.gcsafe.} = - ## Matches a proto with any postfix to the provided codec. - ## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos: - ## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense` - return proto.startsWith(codec) - - return match - -proc updateSwitchPeerInfo(node: WakuNode) = - ## TODO: remove this when supported upstream - ## - ## nim-libp2p does not yet support announcing addrs - ## different from bound addrs. - ## - ## This is a temporary workaround to replace - ## peer info addrs in switch to announced - ## addresses. - ## - ## WARNING: this should only be called once the switch - ## has already been started. - - if node.announcedAddresses.len > 0: - node.switch.peerInfo.addrs = node.announcedAddresses - -template ip4TcpEndPoint(address, port): MultiAddress = - MultiAddress.init(address, tcpProtocol, port) - -template dns4Ma(dns4DomainName: string): MultiAddress = - MultiAddress.init("/dns4/" & dns4DomainName).tryGet() - -template tcpPortMa(port: Port): MultiAddress = - MultiAddress.init("/tcp/" & $port).tryGet() - -template dns4TcpEndPoint(dns4DomainName: string, port: Port): MultiAddress = - dns4Ma(dns4DomainName) & tcpPortMa(port) - -template wsFlag(wssEnabled: bool): MultiAddress = - if wssEnabled: MultiAddress.init("/wss").tryGet() - else: MultiAddress.init("/ws").tryGet() - -proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, - bindIp: ValidIpAddress, bindPort: Port, - extIp = none(ValidIpAddress), extPort = none(Port), - peerStorage: PeerStorage = nil, - maxConnections = builders.MaxConnections, - wsBindPort: Port = (Port)8000, - wsEnabled: bool = false, - wssEnabled: bool = false, - secureKey: string = "", - secureCert: string = "", - wakuFlags = none(WakuEnrBitfield), - nameResolver: NameResolver = nil, - sendSignedPeerRecord = false, - dns4DomainName = none(string), - discv5UdpPort = none(Port) - ): T - {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = - ## Creates a Waku Node. - ## - ## Status: Implemented. - ## - - ## Initialize addresses - let - # Bind addresses - hostAddress = ip4TcpEndPoint(bindIp, bindPort) - wsHostAddress = if wsEnabled or wssEnabled: some(ip4TcpEndPoint(bindIp, wsbindPort) & wsFlag(wssEnabled)) - else: none(MultiAddress) - - # Setup external addresses, if available - var - hostExtAddress, wsExtAddress = none(MultiAddress) - - if (dns4DomainName.isSome()): - # Use dns4 for externally announced addresses - hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get())) - - if (wsHostAddress.isSome()): - wsExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), wsBindPort) & wsFlag(wssEnabled)) - else: - # No public domain name, use ext IP if available - if extIp.isSome() and extPort.isSome(): - hostExtAddress = some(ip4TcpEndPoint(extIp.get(), extPort.get())) - - if (wsHostAddress.isSome()): - wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled)) - - var announcedAddresses: seq[MultiAddress] - if hostExtAddress.isSome: - announcedAddresses.add(hostExtAddress.get()) - else: - announcedAddresses.add(hostAddress) # We always have at least a bind address for the host - - if wsExtAddress.isSome: - announcedAddresses.add(wsExtAddress.get()) - elif wsHostAddress.isSome: - announcedAddresses.add(wsHostAddress.get()) - - ## Initialize peer - let - rng = crypto.newRng() - enrIp = if extIp.isSome(): extIp - else: some(bindIp) - enrTcpPort = if extPort.isSome(): extPort - else: some(bindPort) - enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field - elif wsHostAddress.isSome: @[wsHostAddress.get()] - else: @[] - enr = initEnr(nodeKey, - enrIp, - enrTcpPort, - discv5UdpPort, - wakuFlags, - enrMultiaddrs) - - info "Initializing networking", addrs=announcedAddresses - - var switch = newWakuSwitch(some(nodekey), - hostAddress, - wsHostAddress, - transportFlags = {ServerFlags.ReuseAddr}, - rng = rng, - maxConnections = maxConnections, - wssEnabled = wssEnabled, - secureKeyPath = secureKey, - secureCertPath = secureCert, - nameResolver = nameResolver, - sendSignedPeerRecord = sendSignedPeerRecord) - - let wakuNode = WakuNode( - peerManager: PeerManager.new(switch, peerStorage), - switch: switch, - rng: rng, - enr: enr, - filters: Filters.init(), - announcedAddresses: announcedAddresses - ) - - return wakuNode - -proc peerInfo*(node: WakuNode): PeerInfo = - node.switch.peerInfo - -proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = - if node.wakuRelay.isNil: - error "Invalid API call to `subscribe`. WakuRelay not mounted." - # @TODO improved error handling - return - - info "subscribe", topic=topic - - proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # A default handler should be registered for all topics - trace "Hit default handler", topic=topic, data=data - - let msg = WakuMessage.init(data) - if msg.isErr(): - # TODO: Add metric to track waku message decode errors - return - - - # Notify mounted protocols of new message - if not node.wakuFilter.isNil(): - await node.wakuFilter.handleMessage(topic, msg.value) - - if not node.wakuStore.isNil(): - node.wakuStore.handleMessage(topic, msg.value) - - waku_node_messages.inc(labelValues = ["relay"]) - - - let wakuRelay = node.wakuRelay - - if topic notin PubSub(wakuRelay).topics: - # Add default handler only for new topics - debug "Registering default handler", topic=topic - wakuRelay.subscribe(topic, defaultHandler) - - if handler.isSome: - debug "Registering handler", topic=topic - wakuRelay.subscribe(topic, handler.get()) - -proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = - ## Subscribes to a PubSub topic. Triggers handler when receiving messages on - ## this topic. TopicHandler is a method that takes a topic and some data. - ## - ## NOTE The data field SHOULD be decoded as a WakuMessage. - ## Status: Implemented. - node.subscribe(topic, some(handler)) - -proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - ## FilterHandler is a method that takes a MessagePush. - ## - ## Status: Implemented. - - # Sanity check for well-formed subscribe FilterRequest - doAssert(request.subscribe, "invalid subscribe request") - - info "subscribe content", filter=request - - var id = generateRequestId(node.rng) - - if node.wakuFilter.isNil == false: - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) - - if resSubscription.isOk(): - id = resSubscription.get() - else: - # Failed to subscribe - error "remote subscription to filter failed", filter = request - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - - # Register handler for filter, whether remote subscription succeeded or not - node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) - waku_node_filters.set(node.filters.len.int64) - -proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = - ## Unsubscribes a handler from a PubSub topic. - ## - ## Status: Implemented. - if node.wakuRelay.isNil: - error "Invalid API call to `unsubscribe`. WakuRelay not mounted." - # @TODO improved error handling - return - - info "unsubscribe", topic=topic - - let wakuRelay = node.wakuRelay - wakuRelay.unsubscribe(@[(topic, handler)]) - -proc unsubscribeAll*(node: WakuNode, topic: Topic) = - ## Unsubscribes all handlers registered on a specific PubSub topic. - ## - ## Status: Implemented. - - if node.wakuRelay.isNil: - error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." - # @TODO improved error handling - return - - info "unsubscribeAll", topic=topic - - let wakuRelay = node.wakuRelay - wakuRelay.unsubscribeAll(topic) - - -proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = - ## Unsubscribe from a content filter. - ## - ## Status: Implemented. - - # Sanity check for well-formed unsubscribe FilterRequest - doAssert(request.subscribe == false, "invalid unsubscribe request") - - info "unsubscribe content", filter=request - - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) - node.filters.removeContentFilters(request.contentFilters) - - waku_node_filters.set(node.filters.len.int64) - - -proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = - ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a - ## `contentTopic` field for light node functionality. This field may be also - ## be omitted. - ## - ## Status: Implemented. - - if node.wakuRelay.isNil: - error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." - # @TODO improved error handling - return - - let wakuRelay = node.wakuRelay - trace "publish", topic=topic, contentTopic=message.contentTopic - var publishingMessage = message - - let data = message.encode().buffer - - discard await wakuRelay.publish(topic, data) - -proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = - ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. - ## Returns whether relaying was successful or not. - ## `WakuMessage` should contain a `contentTopic` field for light node - ## functionality. - debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic - - let rpc = PushRequest(pubSubTopic: topic, message: message) - return await node.wakuLightPush.request(rpc) - -proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = - discard await node.lightpush(topic, message) - -proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = - ## Queries known nodes for historical messages - - # TODO: Once waku swap is less experimental, this can simplified - if node.wakuSwap.isNil: - debug "Using default query" - return await node.wakuStore.query(query) - else: - debug "Using SWAP accounting query" - # TODO: wakuSwap now part of wakuStore object - return await node.wakuStore.queryWithAccounting(query) - -proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online - ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) - ## messages are stored in the the wakuStore's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message - ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. - ## The history gets fetched successfully if the dialed peer has been online during the queried time window. - if node.wakuStore.isNil(): - return - - let retrievedMessages = await node.wakuStore.resume(peerList) - if retrievedMessages.isErr(): - error "failed to resume store", error=retrievedMessages.error - return - - info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value - -# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc -proc info*(node: WakuNode): WakuInfo = - ## Returns information about the Node, such as what multiaddress it can be reached at. - ## - ## Status: Implemented. - ## - - let peerInfo = node.switch.peerInfo - - var listenStr : seq[string] - for address in node.announcedAddresses: - var fulladdr = $address & "/p2p/" & $peerInfo.peerId - listenStr &= fulladdr - let enrUri = node.enr.toUri() - let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri) - return wakuInfo - -proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = - info "mounting filter" - proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} = - - info "push received" - for message in msg.messages: - node.filters.notify(message, requestId) # Trigger filter handlers on a light node - - if not node.wakuStore.isNil and (requestId in node.filters): - let pubSubTopic = node.filters[requestId].pubSubTopic - node.wakuStore.handleMessage(pubSubTopic, message) - - waku_node_messages.inc(labelValues = ["filter"]) - - node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout) - if node.started: - # Node has started already. Let's start filter too. - await node.wakuFilter.start() - - node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) - - -# NOTE: If using the swap protocol, it must be mounted before store. This is -# because store is using a reference to the swap protocol. -proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.async, raises: [Defect, LPError].} = - info "mounting swap", mode = $swapConfig.mode - - node.wakuSwap = WakuSwap.init(node.peerManager, node.rng, swapConfig) - if node.started: - # Node has started already. Let's start swap too. - await node.wakuSwap.start() - - node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) - - -const MessageStoreDefaultRetentionPolicyInterval = 30.minutes - -proc executeMessageRetentionPolicy(node: WakuNode) = - if node.wakuStore.isNil(): - return - - if node.wakuStore.store.isNil(): - return - - debug "executing message retention policy" - - node.wakuStore.executeMessageRetentionPolicy() - node.wakuStore.reportStoredMessagesMetric() - -proc startMessageRetentionPolicyPeriodicTask(node: WakuNode, interval: Duration) = - if node.wakuStore.isNil(): - return - - if node.wakuStore.store.isNil(): - return - - # https://github.com/nim-lang/Nim/issues/17369 - var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} - executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = - executeMessageRetentionPolicy(node) - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) - - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) - -proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = - if node.wakuSwap.isNil(): - info "mounting waku store protocol (no waku swap)" - else: - info "mounting waku store protocol with waku swap support" - - node.wakuStore = WakuStore.init( - node.peerManager, - node.rng, - store, - wakuSwap=node.wakuSwap, - retentionPolicy=retentionPolicy - ) - - if node.started: - # Node has started already. Let's start store too. - await node.wakuStore.start() - - node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) - - -proc startRelay*(node: WakuNode) {.async.} = - if node.wakuRelay.isNil: - trace "Failed to start relay. Not mounted." - return - - ## Setup and start relay protocol - info "starting relay" - - # Topic subscriptions - for topic in node.wakuRelay.defaultTopics: - node.subscribe(topic, none(TopicHandler)) - - # Resume previous relay connections - if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): - info "Found previous WakuRelay peers. Reconnecting." - - # Reconnect to previous relay peers. This will respect a backoff period, if necessary - let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) - - await node.peerManager.reconnectPeers(WakuRelayCodec, - protocolMatcher(WakuRelayCodec), - backoffPeriod) - - # Start the WakuRelay protocol - await node.wakuRelay.start() - - info "relay started successfully" - -proc mountRelay*(node: WakuNode, - topics: seq[string] = newSeq[string](), - triggerSelf = true, - peerExchangeHandler = none(RoutingRecordsHandler)) - # @TODO: Better error handling: CatchableError is raised by `waitFor` - {.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = - - proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] = - let mh = MultiHash.digest("sha2-256", m.data) - if mh.isOk(): - return ok(mh[].data.buffer) - else: - return ok(($m.data.hash).toBytes()) - - let wakuRelay = WakuRelay.init( - switch = node.switch, - msgIdProvider = msgIdProvider, - triggerSelf = triggerSelf, - sign = false, - verifySignature = false, - maxMessageSize = MaxWakuMessageSize - ) - - info "mounting relay" - - ## The default relay topics is the union of - ## all configured topics plus the hard-coded defaultTopic(s) - wakuRelay.defaultTopics = concat(@[defaultTopic], topics) - - ## Add peer exchange handler - if peerExchangeHandler.isSome(): - wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p - wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) - - node.wakuRelay = wakuRelay - if node.started: - # Node has started already. Let's start relay too. - await node.startRelay() - - node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) - - info "relay mounted successfully" - -proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = - info "mounting light push" - - if node.wakuRelay.isNil: - debug "mounting lightpush without relay" - node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil) - else: - debug "mounting lightpush with relay" - node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay) - - if node.started: - # Node has started already. Let's start lightpush too. - await node.wakuLightPush.start() - - node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) - -proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = - info "mounting waku peer exchange" - - var discv5Opt: Option[WakuDiscoveryV5] - if not node.wakuDiscV5.isNil(): - discv5Opt = some(node.wakuDiscV5) - node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt) - - if node.started: - # Node has started already. Let's start Waku peer exchange too. - await node.wakuPeerExchange.start() - - node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) - -proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = - info "mounting libp2p ping protocol" - - try: - node.libp2pPing = Ping.new(rng = node.rng) - except Exception as e: - # This is necessary as `Ping.new*` does not have explicit `raises` requirement - # @TODO: remove exception handling once explicit `raises` in ping module - raise newException(LPError, "Failed to initialize ping protocol") - - if node.started: - # Node has started already. Let's start ping too. - await node.libp2pPing.start() - - node.switch.mount(node.libp2pPing) - -proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = - while node.started: - # Keep all connected peers alive while running - trace "Running keepalive" - - # First get a list of connected peer infos - let peers = node.peerManager.peers() - .filterIt(node.peerManager.connectedness(it.peerId) == Connected) - .mapIt(it.toRemotePeerInfo()) - - # Attempt to retrieve and ping the active outgoing connection for each peer - for peer in peers: - let connOpt = await node.peerManager.dialPeer(peer, PingCodec) - - if connOpt.isNone: - # @TODO more sophisticated error handling here - debug "failed to connect to remote peer", peer=peer - waku_node_errors.inc(labelValues = ["keep_alive_failure"]) - return - - discard await node.libp2pPing.ping(connOpt.get()) # Ping connection - - await sleepAsync(keepalive) - -proc startKeepalive*(node: WakuNode) = - let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration - - info "starting keepalive", keepalive=defaultKeepalive - - asyncSpawn node.keepaliveLoop(defaultKeepalive) - -proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuStore.setPeer(peer) - -proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set store peer", address = address - - let peer = parseRemotePeerInfo(address) - n.setStorePeer(peer) - -proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuFilter.setPeer(peer) - -proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set filter peer", address = address - - let peer = parseRemotePeerInfo(address) - n.setFilterPeer(peer) - -proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuLightPush.setPeer(peer) - -proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set lightpush peer", address = address - - let peer = parseRemotePeerInfo(address) - n.wakuLightPush.setPeer(peer) - -proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set peer exchange peer", address = address - - let remotePeer = parseRemotePeerInfo(address) - - n.wakuPeerExchange.setPeer(remotePeer) - -proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - # NOTE This is dialing on WakuRelay protocol specifically - await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source) - - -proc runDiscv5Loop(node: WakuNode) {.async.} = - ## Continuously add newly discovered nodes - ## using Node Discovery v5 - if (node.wakuDiscv5.isNil): - warn "Trying to run discovery v5 while it's disabled" - return - - info "Starting discovery loop" - - while node.wakuDiscv5.listening: - trace "Running discovery loop" - ## Query for a random target and collect all discovered nodes - ## @TODO: we could filter nodes here - let discoveredPeers = await node.wakuDiscv5.findRandomPeers() - if discoveredPeers.isOk: - ## Let's attempt to connect to peers we - ## have not encountered before - - trace "Discovered peers", count=discoveredPeers.get().len() - - let newPeers = discoveredPeers.get().filterIt( - not node.switch.isConnected(it.peerId)) - - if newPeers.len > 0: - debug "Connecting to newly discovered peers", count=newPeers.len() - await connectToNodes(node, newPeers, "discv5") - - # Discovery `queryRandom` can have a synchronous fast path for example - # when no peers are in the routing table. Don't run it in continuous loop. - # - # Also, give some time to dial the discovered nodes and update stats etc - await sleepAsync(5.seconds) - -proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = - ## Start Discovery v5 service - - info "Starting discovery v5 service" - - if not node.wakuDiscv5.isNil: - ## First start listening on configured port - try: - trace "Start listening on discv5 port" - node.wakuDiscv5.open() - except CatchableError: - error "Failed to start discovery service. UDP port may be already in use" - return false - - ## Start Discovery v5 - trace "Start discv5 service" - node.wakuDiscv5.start() - trace "Start discovering new peers using discv5" - - asyncSpawn node.runDiscv5Loop() - - debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() - return true - - return false - -proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = - ## Stop Discovery v5 service - - if not node.wakuDiscv5.isNil: - info "Stopping discovery v5 service" - - ## Stop Discovery v5 process and close listening port - if node.wakuDiscv5.listening: - trace "Stop listening on discv5 port" - await node.wakuDiscv5.closeWait() - - debug "Successfully stopped discovery v5 service" - -proc start*(node: WakuNode) {.async.} = - ## Starts a created Waku Node and - ## all its mounted protocols. - ## - ## Status: Implemented. - - waku_version.set(1, labelValues=[git_version]) - - ## NB: careful when moving this. We need to start the switch with the bind address - ## BEFORE updating with announced addresses for the sake of identify. - await node.switch.start() - - let peerInfo = node.switch.peerInfo - info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs - var listenStr = "" - for address in node.announcedAddresses: - var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" - listenStr &= fulladdr - - ## XXX: this should be /ip4..., / stripped? - info "Listening on", full = listenStr - info "DNS: discoverable ENR ", enr = node.enr.toUri() - - # Perform relay-specific startup tasks TODO: this should be rethought - if not node.wakuRelay.isNil: - await node.startRelay() - - ## Update switch peer info with announced addrs - node.updateSwitchPeerInfo() - - node.started = true - - info "Node started successfully" - -proc stop*(node: WakuNode) {.async.} = - if not node.wakuRelay.isNil: - await node.wakuRelay.stop() - - if not node.wakuDiscv5.isNil: - discard await node.stopDiscv5() - - await node.switch.stop() - - node.started = false {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: @@ -827,6 +52,10 @@ when isMainModule: ./storage/message/dual_message_store, ./storage/message/sqlite_store, ./storage/peer/waku_peer_storage + + when defined(rln) or defined(rlnzerokit): + import ../protocol/waku_rln_relay/waku_rln_relay_utils + logScope: topics = "wakunode.setup" diff --git a/waku/v2/node/wakunode2_setup_metrics.nim b/waku/v2/node/wakunode2_setup_metrics.nim index 091cb29df..9da0a7c64 100644 --- a/waku/v2/node/wakunode2_setup_metrics.nim +++ b/waku/v2/node/wakunode2_setup_metrics.nim @@ -8,7 +8,7 @@ import metrics, metrics/chronos_httpserver, ./config, - ./wakunode2, + ./waku_node, ./peer_manager/peer_manager, ../protocol/waku_filter, ../protocol/waku_store, diff --git a/waku/v2/node/wakunode2_setup_rest.nim b/waku/v2/node/wakunode2_setup_rest.nim index 387cd28e5..a83313c64 100644 --- a/waku/v2/node/wakunode2_setup_rest.nim +++ b/waku/v2/node/wakunode2_setup_rest.nim @@ -6,7 +6,7 @@ import presto import ./config, - ./wakunode2, + ./waku_node, ./rest/server, ./rest/debug/debug_api, ./rest/relay/[relay_api, topic_cache] diff --git a/waku/v2/node/wakunode2_setup_rpc.nim b/waku/v2/node/wakunode2_setup_rpc.nim index 58789c1aa..749011e93 100644 --- a/waku/v2/node/wakunode2_setup_rpc.nim +++ b/waku/v2/node/wakunode2_setup_rpc.nim @@ -8,7 +8,7 @@ import import ./config, ../protocol/waku_message, - ./wakunode2, + ./waku_node, ./jsonrpc/[admin_api, debug_api, filter_api, diff --git a/waku/v2/node/wakunode2_types.nim b/waku/v2/node/wakunode2_types.nim deleted file mode 100644 index 744dfc37c..000000000 --- a/waku/v2/node/wakunode2_types.nim +++ /dev/null @@ -1,50 +0,0 @@ -import - bearssl/rand, - eth/p2p/discoveryv5/enr, - libp2p/crypto/crypto, - libp2p/protocols/ping, - ../protocol/waku_relay, - ../protocol/waku_store, - ../protocol/waku_swap/waku_swap, - ../protocol/waku_filter, - ../protocol/waku_lightpush, - ../protocol/waku_peer_exchange, - ../protocol/waku_rln_relay/waku_rln_relay_types, - ./peer_manager/peer_manager, - ./discv5/waku_discv5 - - -# key and crypto modules different -type - KeyPair* = crypto.KeyPair - PublicKey* = crypto.PublicKey - PrivateKey* = crypto.PrivateKey - - # XXX: Weird type, should probably be using pubsub Topic object name? - Topic* = string - Message* = seq[byte] - - WakuInfo* = object - # NOTE One for simplicity, can extend later as needed - listenAddresses*: seq[string] - enrUri*: string - #multiaddrStrings*: seq[string] - - # NOTE based on Eth2Node in NBC eth2_network.nim - WakuNode* = ref object of RootObj - peerManager*: PeerManager - switch*: Switch - wakuRelay*: WakuRelay - wakuStore*: WakuStore - wakuFilter*: WakuFilter - wakuSwap*: WakuSwap - wakuRlnRelay*: WakuRLNRelay - wakuLightPush*: WakuLightPush - wakuPeerExchange*: WakuPeerExchange - enr*: enr.Record - libp2pPing*: Ping - filters*: Filters - rng*: ref rand.HmacDrbgContext - wakuDiscv5*: WakuDiscoveryV5 - announcedAddresses* : seq[MultiAddress] - started*: bool # Indicates that node has started listening diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 8de2866e1..30af8f870 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -16,8 +16,9 @@ import waku_rln_relay_types, waku_rln_relay_metrics, ../../utils/time, - ../../node/[wakunode2_types,config], - ../../../../../apps/chat2/config_chat2, + ../../node/waku_node, + ../../node/config, ## TODO: Decouple the protocol code from the app configuration + ../../../../../apps/chat2/config_chat2, ## TODO: Decouple the protocol code from the app configuration ../waku_message logScope: diff --git a/waku/waku.nim b/waku/waku.nim index 4f9210825..a10002cd9 100644 --- a/waku/waku.nim +++ b/waku/waku.nim @@ -5,6 +5,6 @@ # - APACHEv2 ([LICENSE-APACHEv2](../LICENSE-APACHEv2) or https://www.apache.org/licenses/LICENSE-2.0) ## An implementation of the [Waku v1](https://specs.vac.dev/specs/waku/waku.html) and [Waku v2](https://specs.vac.dev/specs/waku/v2/waku-v2.html) in nim. -import v2/node/wakunode2, v1/node/wakunode1 +import v2/node/waku_node as wakunode2, v1/node/wakunode1 export wakunode2 export wakunode1