mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
refactor(node): split wakunode into waku_node and wakunode2
This commit is contained in:
parent
b139692b05
commit
5046a4b3da
@ -26,7 +26,7 @@ import
|
|||||||
../../waku/v2/protocol/waku_lightpush,
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/node/[wakunode2, waku_payload],
|
../../waku/v2/node/[waku_node, waku_payload],
|
||||||
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/[peers, time],
|
../../waku/v2/utils/[peers, time],
|
||||||
@ -67,7 +67,7 @@ type Chat = ref object
|
|||||||
|
|
||||||
type
|
type
|
||||||
PrivateKey* = crypto.PrivateKey
|
PrivateKey* = crypto.PrivateKey
|
||||||
Topic* = wakunode2.Topic
|
Topic* = waku_node.Topic
|
||||||
|
|
||||||
#####################
|
#####################
|
||||||
## chat2 protobufs ##
|
## chat2 protobufs ##
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import
|
|||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/errors,
|
libp2p/errors,
|
||||||
../../../waku/v2/protocol/waku_message,
|
../../../waku/v2/protocol/waku_message,
|
||||||
../../../waku/v2/node/wakunode2,
|
../../../waku/v2/node/waku_node,
|
||||||
# Chat 2 imports
|
# Chat 2 imports
|
||||||
../chat2/chat2,
|
../chat2/chat2,
|
||||||
# Common cli config
|
# Common cli config
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import
|
|||||||
../../waku/v2/utils/namespacing,
|
../../waku/v2/utils/namespacing,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
# Common cli config
|
# Common cli config
|
||||||
./config_bridge
|
./config_bridge
|
||||||
@ -51,7 +51,7 @@ type
|
|||||||
WakuBridge* = ref object of RootObj
|
WakuBridge* = ref object of RootObj
|
||||||
nodev1*: EthereumNode
|
nodev1*: EthereumNode
|
||||||
nodev2*: WakuNode
|
nodev2*: WakuNode
|
||||||
nodev2PubsubTopic: wakunode2.Topic # Pubsub topic to bridge to/from
|
nodev2PubsubTopic: waku_node.Topic # Pubsub topic to bridge to/from
|
||||||
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
|
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
|
||||||
rng: ref HmacDrbgContext
|
rng: ref HmacDrbgContext
|
||||||
v1Pool: seq[Node] # Pool of v1 nodes for possible connections
|
v1Pool: seq[Node] # Pool of v1 nodes for possible connections
|
||||||
@ -228,7 +228,7 @@ proc new*(T: type WakuBridge,
|
|||||||
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
|
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
|
||||||
nameResolver: NameResolver = nil,
|
nameResolver: NameResolver = nil,
|
||||||
# Bridge configuration
|
# Bridge configuration
|
||||||
nodev2PubsubTopic: wakunode2.Topic,
|
nodev2PubsubTopic: waku_node.Topic,
|
||||||
v1Pool: seq[Node] = @[],
|
v1Pool: seq[Node] = @[],
|
||||||
targetV1Peers = 0): T
|
targetV1Peers = 0): T
|
||||||
{.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} =
|
{.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} =
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import
|
|||||||
libp2p/crypto/[crypto,secp],
|
libp2p/crypto/[crypto,secp],
|
||||||
eth/keys,
|
eth/keys,
|
||||||
json_rpc/[rpcclient, rpcserver],
|
json_rpc/[rpcclient, rpcserver],
|
||||||
../../waku/v2/node/[config, wakunode2],
|
../../waku/v2/node/[config, waku_node],
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
../../waku/v2/protocol/waku_message
|
../../waku/v2/protocol/waku_message
|
||||||
|
|
||||||
|
|||||||
@ -15,7 +15,7 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/v1/node/rpc/hexstrings,
|
../../waku/v1/node/rpc/hexstrings,
|
||||||
../../waku/v2/node/storage/message/waku_store_queue,
|
../../waku/v2/node/storage/message/waku_store_queue,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/jsonrpc/[store_api,
|
../../waku/v2/node/jsonrpc/[store_api,
|
||||||
relay_api,
|
relay_api,
|
||||||
debug_api,
|
debug_api,
|
||||||
@ -231,7 +231,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
|
|
||||||
# WakuStore setup
|
# WakuStore setup
|
||||||
let
|
let
|
||||||
key = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
key = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.new(key)
|
peer = PeerInfo.new(key)
|
||||||
|
|
||||||
await node.mountStore(store=StoreQueueRef.new())
|
await node.mountStore(store=StoreQueueRef.new())
|
||||||
@ -536,13 +536,13 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
let
|
let
|
||||||
locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
locationAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||||
|
|
||||||
filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
filterPeer = PeerInfo.new(filterKey, @[locationAddr])
|
filterPeer = PeerInfo.new(filterKey, @[locationAddr])
|
||||||
|
|
||||||
swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
swapPeer = PeerInfo.new(swapKey, @[locationAddr])
|
swapPeer = PeerInfo.new(swapKey, @[locationAddr])
|
||||||
|
|
||||||
storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
storePeer = PeerInfo.new(storeKey, @[locationAddr])
|
storePeer = PeerInfo.new(storeKey, @[locationAddr])
|
||||||
|
|
||||||
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
|
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/pubsub/gossipsub
|
libp2p/protocols/pubsub/gossipsub
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
|
|||||||
@ -20,7 +20,7 @@ import
|
|||||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/storage/peer/waku_peer_storage,
|
../../waku/v2/node/storage/peer/waku_peer_storage,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
procSuite "Peer Manager":
|
procSuite "Peer Manager":
|
||||||
@ -89,15 +89,15 @@ procSuite "Peer Manager":
|
|||||||
Port(60000))
|
Port(60000))
|
||||||
# Create filter peer
|
# Create filter peer
|
||||||
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||||
filterKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
filterPeer = PeerInfo.new(filterKey, @[filterLoc])
|
filterPeer = PeerInfo.new(filterKey, @[filterLoc])
|
||||||
# Create swap peer
|
# Create swap peer
|
||||||
swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet()
|
swapLoc = MultiAddress.init("/ip4/127.0.0.2/tcp/2").tryGet()
|
||||||
swapKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
swapKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
swapPeer = PeerInfo.new(swapKey, @[swapLoc])
|
swapPeer = PeerInfo.new(swapKey, @[swapLoc])
|
||||||
# Create store peer
|
# Create store peer
|
||||||
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
|
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
|
||||||
storeKey = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
|
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||||
storePeer = PeerInfo.new(storeKey, @[storeLoc])
|
storePeer = PeerInfo.new(storeKey, @[storeLoc])
|
||||||
|
|
||||||
await node.start()
|
await node.start()
|
||||||
|
|||||||
@ -9,7 +9,7 @@ import
|
|||||||
libp2p/multiaddress,
|
libp2p/multiaddress,
|
||||||
libp2p/crypto/crypto
|
libp2p/crypto/crypto
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/rest/[server, client, utils],
|
../../waku/v2/node/rest/[server, client, utils],
|
||||||
../../waku/v2/node/rest/debug/debug_api
|
../../waku/v2/node/rest/debug/debug_api
|
||||||
|
|
||||||
|
|||||||
@ -11,7 +11,7 @@ import
|
|||||||
libp2p/protocols/pubsub/pubsub
|
libp2p/protocols/pubsub/pubsub
|
||||||
import
|
import
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/rest/[server, client, base64, utils],
|
../../waku/v2/node/rest/[server, client, base64, utils],
|
||||||
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache]
|
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache]
|
||||||
|
|
||||||
|
|||||||
@ -16,7 +16,7 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/v1/protocol/waku_protocol,
|
../../waku/v1/protocol/waku_protocol,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/node/[wakunode2, waku_payload],
|
../../waku/v2/node/[waku_node, waku_payload],
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../apps/wakubridge/wakubridge,
|
../../apps/wakubridge/wakubridge,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import
|
|||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/node/discv5/waku_discv5,
|
../../waku/v2/node/discv5/waku_discv5,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
procSuite "Waku Discovery v5":
|
procSuite "Waku Discovery v5":
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
procSuite "Waku DNS Discovery":
|
procSuite "Waku DNS Discovery":
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
libp2p/stream/[bufferstream, connection],
|
libp2p/stream/[bufferstream, connection],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/multistream,
|
libp2p/multistream,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import
|
|||||||
eth/keys,
|
eth/keys,
|
||||||
eth/p2p/discoveryv5/enr
|
eth/p2p/discoveryv5/enr
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/discv5/waku_discv5,
|
../../waku/v2/node/discv5/waku_discv5,
|
||||||
../../waku/v2/protocol/waku_peer_exchange,
|
../../waku/v2/protocol/waku_peer_exchange,
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import
|
|||||||
waku_rln_relay_types,
|
waku_rln_relay_types,
|
||||||
waku_rln_relay_constants,
|
waku_rln_relay_constants,
|
||||||
waku_rln_relay_metrics],
|
waku_rln_relay_metrics],
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
|
const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto"
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import
|
|||||||
waku_rln_relay_constants,
|
waku_rln_relay_constants,
|
||||||
waku_rln_relay_types,
|
waku_rln_relay_types,
|
||||||
rln_relay_contract],
|
rln_relay_contract],
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../test_helpers,
|
../test_helpers,
|
||||||
./test_utils
|
./test_utils
|
||||||
|
|
||||||
|
|||||||
@ -18,7 +18,7 @@ import
|
|||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||||
../../waku/v2/node/storage/message/waku_store_queue,
|
../../waku/v2/node/storage/message/waku_store_queue,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../test_helpers,
|
../test_helpers,
|
||||||
|
|||||||
@ -14,7 +14,7 @@ import
|
|||||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
|
|
||||||
procSuite "WakuNode":
|
procSuite "WakuNode":
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import
|
|||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
|
|
||||||
procSuite "WakuNode - Filter":
|
procSuite "WakuNode - Filter":
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import
|
|||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
from std/times import getTime, toUnixFloat
|
from std/times import getTime, toUnixFloat
|
||||||
|
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import
|
|||||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
|
|
||||||
template sourceDir: string = currentSourcePath.parentDir()
|
template sourceDir: string = currentSourcePath.parentDir()
|
||||||
|
|||||||
@ -20,7 +20,7 @@ import
|
|||||||
waku_rln_relay_constants],
|
waku_rln_relay_constants],
|
||||||
../../waku/v2/protocol/[waku_relay, waku_message],
|
../../waku/v2/protocol/[waku_relay, waku_message],
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
from std/times import epochTime
|
from std/times import epochTime
|
||||||
|
|
||||||
|
|||||||
@ -23,7 +23,7 @@ import
|
|||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/node/wakunode2
|
../../waku/v2/node/waku_node
|
||||||
|
|
||||||
from std/times import getTime, toUnixFloat
|
from std/times import getTime, toUnixFloat
|
||||||
|
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
system,
|
system,
|
||||||
options
|
options
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
system,
|
system,
|
||||||
options
|
options
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
system,
|
system,
|
||||||
options
|
options
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import
|
|||||||
system,
|
system,
|
||||||
options
|
options
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
system,
|
system,
|
||||||
options
|
options
|
||||||
import
|
import
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
../../waku/v2/node/jsonrpc/jsonrpc_types,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/node/jsonrpc/[jsonrpc_types,jsonrpc_utils]
|
../../waku/v2/node/jsonrpc/[jsonrpc_types,jsonrpc_utils]
|
||||||
|
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/waku_payload,
|
../../waku/v2/node/waku_payload,
|
||||||
../../waku/v2/utils/peers
|
../../waku/v2/utils/peers
|
||||||
|
|
||||||
|
|||||||
@ -6,13 +6,12 @@ import
|
|||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
libp2p/[peerinfo, switch]
|
libp2p/[peerinfo, switch]
|
||||||
import
|
import
|
||||||
../../protocol/waku_message,
|
|
||||||
../../protocol/waku_store,
|
../../protocol/waku_store,
|
||||||
../../protocol/waku_filter,
|
../../protocol/waku_filter,
|
||||||
../../protocol/waku_relay,
|
../../protocol/waku_relay,
|
||||||
../../protocol/waku_swap/waku_swap,
|
../../protocol/waku_swap/waku_swap,
|
||||||
../peer_manager/peer_manager,
|
../peer_manager/peer_manager,
|
||||||
../wakunode2,
|
../waku_node,
|
||||||
./jsonrpc_types
|
./jsonrpc_types
|
||||||
|
|
||||||
export jsonrpc_types
|
export jsonrpc_types
|
||||||
|
|||||||
@ -3,7 +3,7 @@
|
|||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
../wakunode2
|
../waku_node
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "debug api"
|
topics = "debug api"
|
||||||
|
|||||||
@ -7,7 +7,7 @@ import
|
|||||||
import
|
import
|
||||||
../../protocol/waku_message,
|
../../protocol/waku_message,
|
||||||
../../protocol/waku_filter,
|
../../protocol/waku_filter,
|
||||||
../wakunode2,
|
../waku_node,
|
||||||
./jsonrpc_types
|
./jsonrpc_types
|
||||||
|
|
||||||
export jsonrpc_types
|
export jsonrpc_types
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
eth/keys,
|
eth/keys,
|
||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
nimcrypto/sysrand,
|
nimcrypto/sysrand,
|
||||||
../wakunode2,
|
../waku_node,
|
||||||
../waku_payload,
|
../waku_payload,
|
||||||
./jsonrpc_types,
|
./jsonrpc_types,
|
||||||
./jsonrpc_utils
|
./jsonrpc_utils
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
libp2p/protocols/pubsub/pubsub,
|
libp2p/protocols/pubsub/pubsub,
|
||||||
../../protocol/waku_message,
|
../../protocol/waku_message,
|
||||||
../wakunode2,
|
../waku_node,
|
||||||
./jsonrpc_types,
|
./jsonrpc_types,
|
||||||
./jsonrpc_utils
|
./jsonrpc_utils
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
json_rpc/rpcserver
|
json_rpc/rpcserver
|
||||||
import
|
import
|
||||||
../wakunode2,
|
../waku_node,
|
||||||
../../protocol/waku_store,
|
../../protocol/waku_store,
|
||||||
../../utils/time,
|
../../utils/time,
|
||||||
./jsonrpc_types,
|
./jsonrpc_types,
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import
|
|||||||
json_serialization,
|
json_serialization,
|
||||||
json_serialization/std/options
|
json_serialization/std/options
|
||||||
import ".."/serdes
|
import ".."/serdes
|
||||||
import ../../wakunode2
|
import ../../waku_node
|
||||||
|
|
||||||
#### Types
|
#### Types
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import
|
|||||||
presto/[route, client]
|
presto/[route, client]
|
||||||
import "."/api_types
|
import "."/api_types
|
||||||
import ".."/[serdes, utils]
|
import ".."/[serdes, utils]
|
||||||
import ../../wakunode2
|
import ../../waku_node
|
||||||
|
|
||||||
logScope: topics = "rest_api_debug"
|
logScope: topics = "rest_api_debug"
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import
|
|||||||
json_serialization/std/options,
|
json_serialization/std/options,
|
||||||
presto/[route, client, common]
|
presto/[route, client, common]
|
||||||
import
|
import
|
||||||
../../wakunode2,
|
../../waku_node,
|
||||||
../serdes,
|
../serdes,
|
||||||
../utils,
|
../utils,
|
||||||
./api_types,
|
./api_types,
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, times],
|
std/options,
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles
|
chronicles
|
||||||
import
|
import
|
||||||
|
|||||||
830
waku/v2/node/waku_node.nim
Normal file
830
waku/v2/node/waku_node.nim
Normal file
@ -0,0 +1,830 @@
|
|||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[hashes, options, tables, strutils, sequtils, os],
|
||||||
|
chronos, chronicles, metrics,
|
||||||
|
stew/shims/net as stewNet,
|
||||||
|
stew/byteutils,
|
||||||
|
eth/keys,
|
||||||
|
nimcrypto,
|
||||||
|
bearssl/rand,
|
||||||
|
eth/p2p/discoveryv5/enr,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/protocols/ping,
|
||||||
|
libp2p/protocols/pubsub/[gossipsub, rpc/messages],
|
||||||
|
libp2p/nameresolving/nameresolver,
|
||||||
|
libp2p/[builders, multihash],
|
||||||
|
libp2p/transports/[transport, tcptransport, wstransport]
|
||||||
|
import
|
||||||
|
../protocol/[waku_relay, waku_message],
|
||||||
|
../protocol/waku_store,
|
||||||
|
../protocol/waku_swap/waku_swap,
|
||||||
|
../protocol/waku_filter,
|
||||||
|
../protocol/waku_lightpush,
|
||||||
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
|
../protocol/waku_peer_exchange,
|
||||||
|
../utils/[peers, requests, wakuenr],
|
||||||
|
./peer_manager/peer_manager,
|
||||||
|
./storage/message/waku_store_queue,
|
||||||
|
./storage/message/message_retention_policy,
|
||||||
|
./storage/message/message_retention_policy_capacity,
|
||||||
|
./storage/message/message_retention_policy_time,
|
||||||
|
./dnsdisc/waku_dnsdisc,
|
||||||
|
./discv5/waku_discv5,
|
||||||
|
./wakuswitch
|
||||||
|
|
||||||
|
declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"]
|
||||||
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
|
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakunode"
|
||||||
|
|
||||||
|
# Git version in git describe format (defined compile time)
|
||||||
|
const git_version* {.strdefine.} = "n/a"
|
||||||
|
|
||||||
|
# Default clientId
|
||||||
|
const clientId* = "Nimbus Waku v2 node"
|
||||||
|
|
||||||
|
# Default topic
|
||||||
|
const defaultTopic* = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
|
# Default Waku Filter Timeout
|
||||||
|
const WakuFilterTimeout: Duration = 1.days
|
||||||
|
|
||||||
|
|
||||||
|
# key and crypto modules different
|
||||||
|
type
|
||||||
|
# XXX: Weird type, should probably be using pubsub Topic object name?
|
||||||
|
Topic* = string
|
||||||
|
Message* = seq[byte]
|
||||||
|
|
||||||
|
WakuInfo* = object
|
||||||
|
# NOTE One for simplicity, can extend later as needed
|
||||||
|
listenAddresses*: seq[string]
|
||||||
|
enrUri*: string
|
||||||
|
#multiaddrStrings*: seq[string]
|
||||||
|
|
||||||
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
|
WakuNode* = ref object of RootObj
|
||||||
|
peerManager*: PeerManager
|
||||||
|
switch*: Switch
|
||||||
|
wakuRelay*: WakuRelay
|
||||||
|
wakuStore*: WakuStore
|
||||||
|
wakuFilter*: WakuFilter
|
||||||
|
wakuSwap*: WakuSwap
|
||||||
|
wakuRlnRelay*: WakuRLNRelay
|
||||||
|
wakuLightPush*: WakuLightPush
|
||||||
|
wakuPeerExchange*: WakuPeerExchange
|
||||||
|
enr*: enr.Record
|
||||||
|
libp2pPing*: Ping
|
||||||
|
filters*: Filters
|
||||||
|
rng*: ref rand.HmacDrbgContext
|
||||||
|
wakuDiscv5*: WakuDiscoveryV5
|
||||||
|
announcedAddresses* : seq[MultiAddress]
|
||||||
|
started*: bool # Indicates that node has started listening
|
||||||
|
|
||||||
|
|
||||||
|
proc protocolMatcher(codec: string): Matcher =
|
||||||
|
## Returns a protocol matcher function for the provided codec
|
||||||
|
proc match(proto: string): bool {.gcsafe.} =
|
||||||
|
## Matches a proto with any postfix to the provided codec.
|
||||||
|
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
|
||||||
|
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
|
||||||
|
return proto.startsWith(codec)
|
||||||
|
|
||||||
|
return match
|
||||||
|
|
||||||
|
proc updateSwitchPeerInfo(node: WakuNode) =
|
||||||
|
## TODO: remove this when supported upstream
|
||||||
|
##
|
||||||
|
## nim-libp2p does not yet support announcing addrs
|
||||||
|
## different from bound addrs.
|
||||||
|
##
|
||||||
|
## This is a temporary workaround to replace
|
||||||
|
## peer info addrs in switch to announced
|
||||||
|
## addresses.
|
||||||
|
##
|
||||||
|
## WARNING: this should only be called once the switch
|
||||||
|
## has already been started.
|
||||||
|
|
||||||
|
if node.announcedAddresses.len > 0:
|
||||||
|
node.switch.peerInfo.addrs = node.announcedAddresses
|
||||||
|
|
||||||
|
template ip4TcpEndPoint(address, port): MultiAddress =
|
||||||
|
MultiAddress.init(address, tcpProtocol, port)
|
||||||
|
|
||||||
|
template dns4Ma(dns4DomainName: string): MultiAddress =
|
||||||
|
MultiAddress.init("/dns4/" & dns4DomainName).tryGet()
|
||||||
|
|
||||||
|
template tcpPortMa(port: Port): MultiAddress =
|
||||||
|
MultiAddress.init("/tcp/" & $port).tryGet()
|
||||||
|
|
||||||
|
template dns4TcpEndPoint(dns4DomainName: string, port: Port): MultiAddress =
|
||||||
|
dns4Ma(dns4DomainName) & tcpPortMa(port)
|
||||||
|
|
||||||
|
template wsFlag(wssEnabled: bool): MultiAddress =
|
||||||
|
if wssEnabled: MultiAddress.init("/wss").tryGet()
|
||||||
|
else: MultiAddress.init("/ws").tryGet()
|
||||||
|
|
||||||
|
proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||||
|
bindIp: ValidIpAddress, bindPort: Port,
|
||||||
|
extIp = none(ValidIpAddress), extPort = none(Port),
|
||||||
|
peerStorage: PeerStorage = nil,
|
||||||
|
maxConnections = builders.MaxConnections,
|
||||||
|
wsBindPort: Port = (Port)8000,
|
||||||
|
wsEnabled: bool = false,
|
||||||
|
wssEnabled: bool = false,
|
||||||
|
secureKey: string = "",
|
||||||
|
secureCert: string = "",
|
||||||
|
wakuFlags = none(WakuEnrBitfield),
|
||||||
|
nameResolver: NameResolver = nil,
|
||||||
|
sendSignedPeerRecord = false,
|
||||||
|
dns4DomainName = none(string),
|
||||||
|
discv5UdpPort = none(Port)
|
||||||
|
): T
|
||||||
|
{.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||||
|
## Creates a Waku Node.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
##
|
||||||
|
|
||||||
|
## Initialize addresses
|
||||||
|
let
|
||||||
|
# Bind addresses
|
||||||
|
hostAddress = ip4TcpEndPoint(bindIp, bindPort)
|
||||||
|
wsHostAddress = if wsEnabled or wssEnabled: some(ip4TcpEndPoint(bindIp, wsbindPort) & wsFlag(wssEnabled))
|
||||||
|
else: none(MultiAddress)
|
||||||
|
|
||||||
|
# Setup external addresses, if available
|
||||||
|
var
|
||||||
|
hostExtAddress, wsExtAddress = none(MultiAddress)
|
||||||
|
|
||||||
|
if (dns4DomainName.isSome()):
|
||||||
|
# Use dns4 for externally announced addresses
|
||||||
|
hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get()))
|
||||||
|
|
||||||
|
if (wsHostAddress.isSome()):
|
||||||
|
wsExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), wsBindPort) & wsFlag(wssEnabled))
|
||||||
|
else:
|
||||||
|
# No public domain name, use ext IP if available
|
||||||
|
if extIp.isSome() and extPort.isSome():
|
||||||
|
hostExtAddress = some(ip4TcpEndPoint(extIp.get(), extPort.get()))
|
||||||
|
|
||||||
|
if (wsHostAddress.isSome()):
|
||||||
|
wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled))
|
||||||
|
|
||||||
|
var announcedAddresses: seq[MultiAddress]
|
||||||
|
if hostExtAddress.isSome:
|
||||||
|
announcedAddresses.add(hostExtAddress.get())
|
||||||
|
else:
|
||||||
|
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
|
||||||
|
|
||||||
|
if wsExtAddress.isSome:
|
||||||
|
announcedAddresses.add(wsExtAddress.get())
|
||||||
|
elif wsHostAddress.isSome:
|
||||||
|
announcedAddresses.add(wsHostAddress.get())
|
||||||
|
|
||||||
|
## Initialize peer
|
||||||
|
let
|
||||||
|
rng = crypto.newRng()
|
||||||
|
enrIp = if extIp.isSome(): extIp
|
||||||
|
else: some(bindIp)
|
||||||
|
enrTcpPort = if extPort.isSome(): extPort
|
||||||
|
else: some(bindPort)
|
||||||
|
enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field
|
||||||
|
elif wsHostAddress.isSome: @[wsHostAddress.get()]
|
||||||
|
else: @[]
|
||||||
|
enr = initEnr(nodeKey,
|
||||||
|
enrIp,
|
||||||
|
enrTcpPort,
|
||||||
|
discv5UdpPort,
|
||||||
|
wakuFlags,
|
||||||
|
enrMultiaddrs)
|
||||||
|
|
||||||
|
info "Initializing networking", addrs=announcedAddresses
|
||||||
|
|
||||||
|
var switch = newWakuSwitch(some(nodekey),
|
||||||
|
hostAddress,
|
||||||
|
wsHostAddress,
|
||||||
|
transportFlags = {ServerFlags.ReuseAddr},
|
||||||
|
rng = rng,
|
||||||
|
maxConnections = maxConnections,
|
||||||
|
wssEnabled = wssEnabled,
|
||||||
|
secureKeyPath = secureKey,
|
||||||
|
secureCertPath = secureCert,
|
||||||
|
nameResolver = nameResolver,
|
||||||
|
sendSignedPeerRecord = sendSignedPeerRecord)
|
||||||
|
|
||||||
|
let wakuNode = WakuNode(
|
||||||
|
peerManager: PeerManager.new(switch, peerStorage),
|
||||||
|
switch: switch,
|
||||||
|
rng: rng,
|
||||||
|
enr: enr,
|
||||||
|
filters: Filters.init(),
|
||||||
|
announcedAddresses: announcedAddresses
|
||||||
|
)
|
||||||
|
|
||||||
|
return wakuNode
|
||||||
|
|
||||||
|
proc peerInfo*(node: WakuNode): PeerInfo =
|
||||||
|
node.switch.peerInfo
|
||||||
|
|
||||||
|
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
|
info "subscribe", topic=topic
|
||||||
|
|
||||||
|
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
# A default handler should be registered for all topics
|
||||||
|
trace "Hit default handler", topic=topic, data=data
|
||||||
|
|
||||||
|
let msg = WakuMessage.init(data)
|
||||||
|
if msg.isErr():
|
||||||
|
# TODO: Add metric to track waku message decode errors
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
# Notify mounted protocols of new message
|
||||||
|
if not node.wakuFilter.isNil():
|
||||||
|
await node.wakuFilter.handleMessage(topic, msg.value)
|
||||||
|
|
||||||
|
if not node.wakuStore.isNil():
|
||||||
|
node.wakuStore.handleMessage(topic, msg.value)
|
||||||
|
|
||||||
|
waku_node_messages.inc(labelValues = ["relay"])
|
||||||
|
|
||||||
|
|
||||||
|
let wakuRelay = node.wakuRelay
|
||||||
|
|
||||||
|
if topic notin PubSub(wakuRelay).topics:
|
||||||
|
# Add default handler only for new topics
|
||||||
|
debug "Registering default handler", topic=topic
|
||||||
|
wakuRelay.subscribe(topic, defaultHandler)
|
||||||
|
|
||||||
|
if handler.isSome:
|
||||||
|
debug "Registering handler", topic=topic
|
||||||
|
wakuRelay.subscribe(topic, handler.get())
|
||||||
|
|
||||||
|
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
||||||
|
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
||||||
|
## this topic. TopicHandler is a method that takes a topic and some data.
|
||||||
|
##
|
||||||
|
## NOTE The data field SHOULD be decoded as a WakuMessage.
|
||||||
|
## Status: Implemented.
|
||||||
|
node.subscribe(topic, some(handler))
|
||||||
|
|
||||||
|
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
|
||||||
|
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||||
|
## FilterHandler is a method that takes a MessagePush.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
|
||||||
|
# Sanity check for well-formed subscribe FilterRequest
|
||||||
|
doAssert(request.subscribe, "invalid subscribe request")
|
||||||
|
|
||||||
|
info "subscribe content", filter=request
|
||||||
|
|
||||||
|
var id = generateRequestId(node.rng)
|
||||||
|
|
||||||
|
if node.wakuFilter.isNil == false:
|
||||||
|
let
|
||||||
|
pubsubTopic = request.pubsubTopic
|
||||||
|
contentTopics = request.contentFilters.mapIt(it.contentTopic)
|
||||||
|
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
|
||||||
|
|
||||||
|
if resSubscription.isOk():
|
||||||
|
id = resSubscription.get()
|
||||||
|
else:
|
||||||
|
# Failed to subscribe
|
||||||
|
error "remote subscription to filter failed", filter = request
|
||||||
|
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||||
|
|
||||||
|
# Register handler for filter, whether remote subscription succeeded or not
|
||||||
|
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
|
||||||
|
waku_node_filters.set(node.filters.len.int64)
|
||||||
|
|
||||||
|
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
||||||
|
## Unsubscribes a handler from a PubSub topic.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
|
info "unsubscribe", topic=topic
|
||||||
|
|
||||||
|
let wakuRelay = node.wakuRelay
|
||||||
|
wakuRelay.unsubscribe(@[(topic, handler)])
|
||||||
|
|
||||||
|
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
|
||||||
|
## Unsubscribes all handlers registered on a specific PubSub topic.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
|
info "unsubscribeAll", topic=topic
|
||||||
|
|
||||||
|
let wakuRelay = node.wakuRelay
|
||||||
|
wakuRelay.unsubscribeAll(topic)
|
||||||
|
|
||||||
|
|
||||||
|
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
|
||||||
|
## Unsubscribe from a content filter.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
|
||||||
|
# Sanity check for well-formed unsubscribe FilterRequest
|
||||||
|
doAssert(request.subscribe == false, "invalid unsubscribe request")
|
||||||
|
|
||||||
|
info "unsubscribe content", filter=request
|
||||||
|
|
||||||
|
let
|
||||||
|
pubsubTopic = request.pubsubTopic
|
||||||
|
contentTopics = request.contentFilters.mapIt(it.contentTopic)
|
||||||
|
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
|
||||||
|
node.filters.removeContentFilters(request.contentFilters)
|
||||||
|
|
||||||
|
waku_node_filters.set(node.filters.len.int64)
|
||||||
|
|
||||||
|
|
||||||
|
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
||||||
|
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||||
|
## `contentTopic` field for light node functionality. This field may be also
|
||||||
|
## be omitted.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
||||||
|
# @TODO improved error handling
|
||||||
|
return
|
||||||
|
|
||||||
|
let wakuRelay = node.wakuRelay
|
||||||
|
trace "publish", topic=topic, contentTopic=message.contentTopic
|
||||||
|
var publishingMessage = message
|
||||||
|
|
||||||
|
let data = message.encode().buffer
|
||||||
|
|
||||||
|
discard await wakuRelay.publish(topic, data)
|
||||||
|
|
||||||
|
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
||||||
|
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
||||||
|
## Returns whether relaying was successful or not.
|
||||||
|
## `WakuMessage` should contain a `contentTopic` field for light node
|
||||||
|
## functionality.
|
||||||
|
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
|
||||||
|
|
||||||
|
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
||||||
|
return await node.wakuLightPush.request(rpc)
|
||||||
|
|
||||||
|
proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
||||||
|
discard await node.lightpush(topic, message)
|
||||||
|
|
||||||
|
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||||
|
## Queries known nodes for historical messages
|
||||||
|
|
||||||
|
# TODO: Once waku swap is less experimental, this can simplified
|
||||||
|
if node.wakuSwap.isNil:
|
||||||
|
debug "Using default query"
|
||||||
|
return await node.wakuStore.query(query)
|
||||||
|
else:
|
||||||
|
debug "Using SWAP accounting query"
|
||||||
|
# TODO: wakuSwap now part of wakuStore object
|
||||||
|
return await node.wakuStore.queryWithAccounting(query)
|
||||||
|
|
||||||
|
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
|
||||||
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
||||||
|
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
|
||||||
|
## messages are stored in the the wakuStore's messages field and in the message db
|
||||||
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||||
|
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||||
|
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||||
|
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||||
|
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||||
|
if node.wakuStore.isNil():
|
||||||
|
return
|
||||||
|
|
||||||
|
let retrievedMessages = await node.wakuStore.resume(peerList)
|
||||||
|
if retrievedMessages.isErr():
|
||||||
|
error "failed to resume store", error=retrievedMessages.error
|
||||||
|
return
|
||||||
|
|
||||||
|
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||||
|
|
||||||
|
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||||
|
proc info*(node: WakuNode): WakuInfo =
|
||||||
|
## Returns information about the Node, such as what multiaddress it can be reached at.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
##
|
||||||
|
|
||||||
|
let peerInfo = node.switch.peerInfo
|
||||||
|
|
||||||
|
var listenStr : seq[string]
|
||||||
|
for address in node.announcedAddresses:
|
||||||
|
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
|
||||||
|
listenStr &= fulladdr
|
||||||
|
let enrUri = node.enr.toUri()
|
||||||
|
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
|
||||||
|
return wakuInfo
|
||||||
|
|
||||||
|
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
|
||||||
|
info "mounting filter"
|
||||||
|
proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} =
|
||||||
|
|
||||||
|
info "push received"
|
||||||
|
for message in msg.messages:
|
||||||
|
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||||
|
|
||||||
|
if not node.wakuStore.isNil and (requestId in node.filters):
|
||||||
|
let pubSubTopic = node.filters[requestId].pubSubTopic
|
||||||
|
node.wakuStore.handleMessage(pubSubTopic, message)
|
||||||
|
|
||||||
|
waku_node_messages.inc(labelValues = ["filter"])
|
||||||
|
|
||||||
|
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start filter too.
|
||||||
|
await node.wakuFilter.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: If using the swap protocol, it must be mounted before store. This is
|
||||||
|
# because store is using a reference to the swap protocol.
|
||||||
|
proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.async, raises: [Defect, LPError].} =
|
||||||
|
info "mounting swap", mode = $swapConfig.mode
|
||||||
|
|
||||||
|
node.wakuSwap = WakuSwap.init(node.peerManager, node.rng, swapConfig)
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start swap too.
|
||||||
|
await node.wakuSwap.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
|
||||||
|
|
||||||
|
|
||||||
|
const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes
|
||||||
|
|
||||||
|
proc executeMessageRetentionPolicy*(node: WakuNode) =
|
||||||
|
if node.wakuStore.isNil():
|
||||||
|
return
|
||||||
|
|
||||||
|
if node.wakuStore.store.isNil():
|
||||||
|
return
|
||||||
|
|
||||||
|
debug "executing message retention policy"
|
||||||
|
|
||||||
|
node.wakuStore.executeMessageRetentionPolicy()
|
||||||
|
node.wakuStore.reportStoredMessagesMetric()
|
||||||
|
|
||||||
|
proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) =
|
||||||
|
if node.wakuStore.isNil():
|
||||||
|
return
|
||||||
|
|
||||||
|
if node.wakuStore.store.isNil():
|
||||||
|
return
|
||||||
|
|
||||||
|
# https://github.com/nim-lang/Nim/issues/17369
|
||||||
|
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
||||||
|
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
|
||||||
|
executeMessageRetentionPolicy(node)
|
||||||
|
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
||||||
|
|
||||||
|
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
||||||
|
|
||||||
|
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
|
||||||
|
if node.wakuSwap.isNil():
|
||||||
|
info "mounting waku store protocol (no waku swap)"
|
||||||
|
else:
|
||||||
|
info "mounting waku store protocol with waku swap support"
|
||||||
|
|
||||||
|
node.wakuStore = WakuStore.init(
|
||||||
|
node.peerManager,
|
||||||
|
node.rng,
|
||||||
|
store,
|
||||||
|
wakuSwap=node.wakuSwap,
|
||||||
|
retentionPolicy=retentionPolicy
|
||||||
|
)
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start store too.
|
||||||
|
await node.wakuStore.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
|
||||||
|
|
||||||
|
|
||||||
|
proc startRelay*(node: WakuNode) {.async.} =
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
trace "Failed to start relay. Not mounted."
|
||||||
|
return
|
||||||
|
|
||||||
|
## Setup and start relay protocol
|
||||||
|
info "starting relay"
|
||||||
|
|
||||||
|
# Topic subscriptions
|
||||||
|
for topic in node.wakuRelay.defaultTopics:
|
||||||
|
node.subscribe(topic, none(TopicHandler))
|
||||||
|
|
||||||
|
# Resume previous relay connections
|
||||||
|
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||||
|
info "Found previous WakuRelay peers. Reconnecting."
|
||||||
|
|
||||||
|
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||||
|
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
||||||
|
|
||||||
|
await node.peerManager.reconnectPeers(WakuRelayCodec,
|
||||||
|
protocolMatcher(WakuRelayCodec),
|
||||||
|
backoffPeriod)
|
||||||
|
|
||||||
|
# Start the WakuRelay protocol
|
||||||
|
await node.wakuRelay.start()
|
||||||
|
|
||||||
|
info "relay started successfully"
|
||||||
|
|
||||||
|
proc mountRelay*(node: WakuNode,
|
||||||
|
topics: seq[string] = newSeq[string](),
|
||||||
|
triggerSelf = true,
|
||||||
|
peerExchangeHandler = none(RoutingRecordsHandler))
|
||||||
|
# @TODO: Better error handling: CatchableError is raised by `waitFor`
|
||||||
|
{.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
|
||||||
|
|
||||||
|
proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] =
|
||||||
|
let mh = MultiHash.digest("sha2-256", m.data)
|
||||||
|
if mh.isOk():
|
||||||
|
return ok(mh[].data.buffer)
|
||||||
|
else:
|
||||||
|
return ok(($m.data.hash).toBytes())
|
||||||
|
|
||||||
|
let wakuRelay = WakuRelay.init(
|
||||||
|
switch = node.switch,
|
||||||
|
msgIdProvider = msgIdProvider,
|
||||||
|
triggerSelf = triggerSelf,
|
||||||
|
sign = false,
|
||||||
|
verifySignature = false,
|
||||||
|
maxMessageSize = MaxWakuMessageSize
|
||||||
|
)
|
||||||
|
|
||||||
|
info "mounting relay"
|
||||||
|
|
||||||
|
## The default relay topics is the union of
|
||||||
|
## all configured topics plus the hard-coded defaultTopic(s)
|
||||||
|
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
|
||||||
|
|
||||||
|
## Add peer exchange handler
|
||||||
|
if peerExchangeHandler.isSome():
|
||||||
|
wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p
|
||||||
|
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
||||||
|
|
||||||
|
node.wakuRelay = wakuRelay
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start relay too.
|
||||||
|
await node.startRelay()
|
||||||
|
|
||||||
|
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
|
||||||
|
|
||||||
|
info "relay mounted successfully"
|
||||||
|
|
||||||
|
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||||
|
info "mounting light push"
|
||||||
|
|
||||||
|
if node.wakuRelay.isNil:
|
||||||
|
debug "mounting lightpush without relay"
|
||||||
|
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
|
||||||
|
else:
|
||||||
|
debug "mounting lightpush with relay"
|
||||||
|
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start lightpush too.
|
||||||
|
await node.wakuLightPush.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
||||||
|
|
||||||
|
proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||||
|
info "mounting waku peer exchange"
|
||||||
|
|
||||||
|
var discv5Opt: Option[WakuDiscoveryV5]
|
||||||
|
if not node.wakuDiscV5.isNil():
|
||||||
|
discv5Opt = some(node.wakuDiscV5)
|
||||||
|
node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt)
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start Waku peer exchange too.
|
||||||
|
await node.wakuPeerExchange.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
||||||
|
|
||||||
|
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||||
|
info "mounting libp2p ping protocol"
|
||||||
|
|
||||||
|
try:
|
||||||
|
node.libp2pPing = Ping.new(rng = node.rng)
|
||||||
|
except Exception as e:
|
||||||
|
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
|
||||||
|
# @TODO: remove exception handling once explicit `raises` in ping module
|
||||||
|
raise newException(LPError, "Failed to initialize ping protocol")
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
# Node has started already. Let's start ping too.
|
||||||
|
await node.libp2pPing.start()
|
||||||
|
|
||||||
|
node.switch.mount(node.libp2pPing)
|
||||||
|
|
||||||
|
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||||
|
while node.started:
|
||||||
|
# Keep all connected peers alive while running
|
||||||
|
trace "Running keepalive"
|
||||||
|
|
||||||
|
# First get a list of connected peer infos
|
||||||
|
let peers = node.peerManager.peers()
|
||||||
|
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
|
||||||
|
.mapIt(it.toRemotePeerInfo())
|
||||||
|
|
||||||
|
# Attempt to retrieve and ping the active outgoing connection for each peer
|
||||||
|
for peer in peers:
|
||||||
|
let connOpt = await node.peerManager.dialPeer(peer, PingCodec)
|
||||||
|
|
||||||
|
if connOpt.isNone:
|
||||||
|
# @TODO more sophisticated error handling here
|
||||||
|
debug "failed to connect to remote peer", peer=peer
|
||||||
|
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
|
||||||
|
return
|
||||||
|
|
||||||
|
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
|
||||||
|
|
||||||
|
await sleepAsync(keepalive)
|
||||||
|
|
||||||
|
proc startKeepalive*(node: WakuNode) =
|
||||||
|
let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration
|
||||||
|
|
||||||
|
info "starting keepalive", keepalive=defaultKeepalive
|
||||||
|
|
||||||
|
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
||||||
|
|
||||||
|
proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) =
|
||||||
|
n.wakuStore.setPeer(peer)
|
||||||
|
|
||||||
|
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||||
|
info "Set store peer", address = address
|
||||||
|
|
||||||
|
let peer = parseRemotePeerInfo(address)
|
||||||
|
n.setStorePeer(peer)
|
||||||
|
|
||||||
|
proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) =
|
||||||
|
n.wakuFilter.setPeer(peer)
|
||||||
|
|
||||||
|
proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||||
|
info "Set filter peer", address = address
|
||||||
|
|
||||||
|
let peer = parseRemotePeerInfo(address)
|
||||||
|
n.setFilterPeer(peer)
|
||||||
|
|
||||||
|
proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) =
|
||||||
|
n.wakuLightPush.setPeer(peer)
|
||||||
|
|
||||||
|
proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||||
|
info "Set lightpush peer", address = address
|
||||||
|
|
||||||
|
let peer = parseRemotePeerInfo(address)
|
||||||
|
n.wakuLightPush.setPeer(peer)
|
||||||
|
|
||||||
|
proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||||
|
info "Set peer exchange peer", address = address
|
||||||
|
|
||||||
|
let remotePeer = parseRemotePeerInfo(address)
|
||||||
|
|
||||||
|
n.wakuPeerExchange.setPeer(remotePeer)
|
||||||
|
|
||||||
|
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
|
||||||
|
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||||
|
# NOTE This is dialing on WakuRelay protocol specifically
|
||||||
|
await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source)
|
||||||
|
|
||||||
|
|
||||||
|
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||||
|
## Continuously add newly discovered nodes
|
||||||
|
## using Node Discovery v5
|
||||||
|
if (node.wakuDiscv5.isNil):
|
||||||
|
warn "Trying to run discovery v5 while it's disabled"
|
||||||
|
return
|
||||||
|
|
||||||
|
info "Starting discovery loop"
|
||||||
|
|
||||||
|
while node.wakuDiscv5.listening:
|
||||||
|
trace "Running discovery loop"
|
||||||
|
## Query for a random target and collect all discovered nodes
|
||||||
|
## @TODO: we could filter nodes here
|
||||||
|
let discoveredPeers = await node.wakuDiscv5.findRandomPeers()
|
||||||
|
if discoveredPeers.isOk:
|
||||||
|
## Let's attempt to connect to peers we
|
||||||
|
## have not encountered before
|
||||||
|
|
||||||
|
trace "Discovered peers", count=discoveredPeers.get().len()
|
||||||
|
|
||||||
|
let newPeers = discoveredPeers.get().filterIt(
|
||||||
|
not node.switch.isConnected(it.peerId))
|
||||||
|
|
||||||
|
if newPeers.len > 0:
|
||||||
|
debug "Connecting to newly discovered peers", count=newPeers.len()
|
||||||
|
await connectToNodes(node, newPeers, "discv5")
|
||||||
|
|
||||||
|
# Discovery `queryRandom` can have a synchronous fast path for example
|
||||||
|
# when no peers are in the routing table. Don't run it in continuous loop.
|
||||||
|
#
|
||||||
|
# Also, give some time to dial the discovered nodes and update stats etc
|
||||||
|
await sleepAsync(5.seconds)
|
||||||
|
|
||||||
|
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||||
|
## Start Discovery v5 service
|
||||||
|
|
||||||
|
info "Starting discovery v5 service"
|
||||||
|
|
||||||
|
if not node.wakuDiscv5.isNil:
|
||||||
|
## First start listening on configured port
|
||||||
|
try:
|
||||||
|
trace "Start listening on discv5 port"
|
||||||
|
node.wakuDiscv5.open()
|
||||||
|
except CatchableError:
|
||||||
|
error "Failed to start discovery service. UDP port may be already in use"
|
||||||
|
return false
|
||||||
|
|
||||||
|
## Start Discovery v5
|
||||||
|
trace "Start discv5 service"
|
||||||
|
node.wakuDiscv5.start()
|
||||||
|
trace "Start discovering new peers using discv5"
|
||||||
|
|
||||||
|
asyncSpawn node.runDiscv5Loop()
|
||||||
|
|
||||||
|
debug "Successfully started discovery v5 service"
|
||||||
|
info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri()
|
||||||
|
return true
|
||||||
|
|
||||||
|
return false
|
||||||
|
|
||||||
|
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
||||||
|
## Stop Discovery v5 service
|
||||||
|
|
||||||
|
if not node.wakuDiscv5.isNil:
|
||||||
|
info "Stopping discovery v5 service"
|
||||||
|
|
||||||
|
## Stop Discovery v5 process and close listening port
|
||||||
|
if node.wakuDiscv5.listening:
|
||||||
|
trace "Stop listening on discv5 port"
|
||||||
|
await node.wakuDiscv5.closeWait()
|
||||||
|
|
||||||
|
debug "Successfully stopped discovery v5 service"
|
||||||
|
|
||||||
|
proc start*(node: WakuNode) {.async.} =
|
||||||
|
## Starts a created Waku Node and
|
||||||
|
## all its mounted protocols.
|
||||||
|
##
|
||||||
|
## Status: Implemented.
|
||||||
|
|
||||||
|
waku_version.set(1, labelValues=[git_version])
|
||||||
|
|
||||||
|
## NB: careful when moving this. We need to start the switch with the bind address
|
||||||
|
## BEFORE updating with announced addresses for the sake of identify.
|
||||||
|
await node.switch.start()
|
||||||
|
|
||||||
|
let peerInfo = node.switch.peerInfo
|
||||||
|
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
||||||
|
var listenStr = ""
|
||||||
|
for address in node.announcedAddresses:
|
||||||
|
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
||||||
|
listenStr &= fulladdr
|
||||||
|
|
||||||
|
## XXX: this should be /ip4..., / stripped?
|
||||||
|
info "Listening on", full = listenStr
|
||||||
|
info "DNS: discoverable ENR ", enr = node.enr.toUri()
|
||||||
|
|
||||||
|
# Perform relay-specific startup tasks TODO: this should be rethought
|
||||||
|
if not node.wakuRelay.isNil:
|
||||||
|
await node.startRelay()
|
||||||
|
|
||||||
|
## Update switch peer info with announced addrs
|
||||||
|
node.updateSwitchPeerInfo()
|
||||||
|
|
||||||
|
node.started = true
|
||||||
|
|
||||||
|
info "Node started successfully"
|
||||||
|
|
||||||
|
proc stop*(node: WakuNode) {.async.} =
|
||||||
|
if not node.wakuRelay.isNil:
|
||||||
|
await node.wakuRelay.stop()
|
||||||
|
|
||||||
|
if not node.wakuDiscv5.isNil:
|
||||||
|
discard await node.stopDiscv5()
|
||||||
|
|
||||||
|
await node.switch.stop()
|
||||||
|
|
||||||
|
node.started = false
|
||||||
@ -1,806 +1,31 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[hashes, options, tables, strutils, sequtils, os],
|
std/[options, tables, strutils, sequtils, os],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
stew/byteutils,
|
|
||||||
eth/keys,
|
eth/keys,
|
||||||
nimcrypto,
|
|
||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/ping,
|
libp2p/protocols/ping,
|
||||||
libp2p/protocols/pubsub/[gossipsub, rpc/messages],
|
libp2p/protocols/pubsub/[gossipsub, rpc/messages],
|
||||||
libp2p/nameresolving/nameresolver,
|
|
||||||
libp2p/[builders, multihash],
|
libp2p/[builders, multihash],
|
||||||
libp2p/transports/[transport, tcptransport, wstransport]
|
libp2p/transports/[transport, wstransport]
|
||||||
import
|
import
|
||||||
../protocol/[waku_relay, waku_message],
|
|
||||||
../protocol/waku_store,
|
../protocol/waku_store,
|
||||||
../protocol/waku_swap/waku_swap,
|
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_lightpush,
|
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||||
../protocol/waku_peer_exchange,
|
../protocol/waku_peer_exchange,
|
||||||
../utils/[peers, requests, wakuenr],
|
../utils/[peers, wakuenr],
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./storage/message/waku_store_queue,
|
./storage/message/waku_store_queue,
|
||||||
./storage/message/message_retention_policy,
|
|
||||||
./storage/message/message_retention_policy_capacity,
|
./storage/message/message_retention_policy_capacity,
|
||||||
./storage/message/message_retention_policy_time,
|
./storage/message/message_retention_policy_time,
|
||||||
./dnsdisc/waku_dnsdisc,
|
./dnsdisc/waku_dnsdisc,
|
||||||
./discv5/waku_discv5,
|
./discv5/waku_discv5,
|
||||||
./wakuswitch,
|
./wakuswitch,
|
||||||
./wakunode2_types
|
./waku_node
|
||||||
|
|
||||||
export
|
|
||||||
wakunode2_types
|
|
||||||
|
|
||||||
when defined(rln) or defined(rlnzerokit):
|
|
||||||
import ../protocol/waku_rln_relay/waku_rln_relay_utils
|
|
||||||
|
|
||||||
declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"]
|
|
||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
|
||||||
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
|
||||||
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "wakunode"
|
|
||||||
|
|
||||||
# Git version in git describe format (defined compile time)
|
|
||||||
const git_version* {.strdefine.} = "n/a"
|
|
||||||
|
|
||||||
# Default clientId
|
|
||||||
const clientId* = "Nimbus Waku v2 node"
|
|
||||||
|
|
||||||
# Default topic
|
|
||||||
const defaultTopic* = "/waku/2/default-waku/proto"
|
|
||||||
|
|
||||||
# Default Waku Filter Timeout
|
|
||||||
const WakuFilterTimeout: Duration = 1.days
|
|
||||||
|
|
||||||
proc protocolMatcher(codec: string): Matcher =
|
|
||||||
## Returns a protocol matcher function for the provided codec
|
|
||||||
proc match(proto: string): bool {.gcsafe.} =
|
|
||||||
## Matches a proto with any postfix to the provided codec.
|
|
||||||
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
|
|
||||||
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
|
|
||||||
return proto.startsWith(codec)
|
|
||||||
|
|
||||||
return match
|
|
||||||
|
|
||||||
proc updateSwitchPeerInfo(node: WakuNode) =
|
|
||||||
## TODO: remove this when supported upstream
|
|
||||||
##
|
|
||||||
## nim-libp2p does not yet support announcing addrs
|
|
||||||
## different from bound addrs.
|
|
||||||
##
|
|
||||||
## This is a temporary workaround to replace
|
|
||||||
## peer info addrs in switch to announced
|
|
||||||
## addresses.
|
|
||||||
##
|
|
||||||
## WARNING: this should only be called once the switch
|
|
||||||
## has already been started.
|
|
||||||
|
|
||||||
if node.announcedAddresses.len > 0:
|
|
||||||
node.switch.peerInfo.addrs = node.announcedAddresses
|
|
||||||
|
|
||||||
template ip4TcpEndPoint(address, port): MultiAddress =
|
|
||||||
MultiAddress.init(address, tcpProtocol, port)
|
|
||||||
|
|
||||||
template dns4Ma(dns4DomainName: string): MultiAddress =
|
|
||||||
MultiAddress.init("/dns4/" & dns4DomainName).tryGet()
|
|
||||||
|
|
||||||
template tcpPortMa(port: Port): MultiAddress =
|
|
||||||
MultiAddress.init("/tcp/" & $port).tryGet()
|
|
||||||
|
|
||||||
template dns4TcpEndPoint(dns4DomainName: string, port: Port): MultiAddress =
|
|
||||||
dns4Ma(dns4DomainName) & tcpPortMa(port)
|
|
||||||
|
|
||||||
template wsFlag(wssEnabled: bool): MultiAddress =
|
|
||||||
if wssEnabled: MultiAddress.init("/wss").tryGet()
|
|
||||||
else: MultiAddress.init("/ws").tryGet()
|
|
||||||
|
|
||||||
proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|
||||||
bindIp: ValidIpAddress, bindPort: Port,
|
|
||||||
extIp = none(ValidIpAddress), extPort = none(Port),
|
|
||||||
peerStorage: PeerStorage = nil,
|
|
||||||
maxConnections = builders.MaxConnections,
|
|
||||||
wsBindPort: Port = (Port)8000,
|
|
||||||
wsEnabled: bool = false,
|
|
||||||
wssEnabled: bool = false,
|
|
||||||
secureKey: string = "",
|
|
||||||
secureCert: string = "",
|
|
||||||
wakuFlags = none(WakuEnrBitfield),
|
|
||||||
nameResolver: NameResolver = nil,
|
|
||||||
sendSignedPeerRecord = false,
|
|
||||||
dns4DomainName = none(string),
|
|
||||||
discv5UdpPort = none(Port)
|
|
||||||
): T
|
|
||||||
{.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
|
||||||
## Creates a Waku Node.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
##
|
|
||||||
|
|
||||||
## Initialize addresses
|
|
||||||
let
|
|
||||||
# Bind addresses
|
|
||||||
hostAddress = ip4TcpEndPoint(bindIp, bindPort)
|
|
||||||
wsHostAddress = if wsEnabled or wssEnabled: some(ip4TcpEndPoint(bindIp, wsbindPort) & wsFlag(wssEnabled))
|
|
||||||
else: none(MultiAddress)
|
|
||||||
|
|
||||||
# Setup external addresses, if available
|
|
||||||
var
|
|
||||||
hostExtAddress, wsExtAddress = none(MultiAddress)
|
|
||||||
|
|
||||||
if (dns4DomainName.isSome()):
|
|
||||||
# Use dns4 for externally announced addresses
|
|
||||||
hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get()))
|
|
||||||
|
|
||||||
if (wsHostAddress.isSome()):
|
|
||||||
wsExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), wsBindPort) & wsFlag(wssEnabled))
|
|
||||||
else:
|
|
||||||
# No public domain name, use ext IP if available
|
|
||||||
if extIp.isSome() and extPort.isSome():
|
|
||||||
hostExtAddress = some(ip4TcpEndPoint(extIp.get(), extPort.get()))
|
|
||||||
|
|
||||||
if (wsHostAddress.isSome()):
|
|
||||||
wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled))
|
|
||||||
|
|
||||||
var announcedAddresses: seq[MultiAddress]
|
|
||||||
if hostExtAddress.isSome:
|
|
||||||
announcedAddresses.add(hostExtAddress.get())
|
|
||||||
else:
|
|
||||||
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
|
|
||||||
|
|
||||||
if wsExtAddress.isSome:
|
|
||||||
announcedAddresses.add(wsExtAddress.get())
|
|
||||||
elif wsHostAddress.isSome:
|
|
||||||
announcedAddresses.add(wsHostAddress.get())
|
|
||||||
|
|
||||||
## Initialize peer
|
|
||||||
let
|
|
||||||
rng = crypto.newRng()
|
|
||||||
enrIp = if extIp.isSome(): extIp
|
|
||||||
else: some(bindIp)
|
|
||||||
enrTcpPort = if extPort.isSome(): extPort
|
|
||||||
else: some(bindPort)
|
|
||||||
enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field
|
|
||||||
elif wsHostAddress.isSome: @[wsHostAddress.get()]
|
|
||||||
else: @[]
|
|
||||||
enr = initEnr(nodeKey,
|
|
||||||
enrIp,
|
|
||||||
enrTcpPort,
|
|
||||||
discv5UdpPort,
|
|
||||||
wakuFlags,
|
|
||||||
enrMultiaddrs)
|
|
||||||
|
|
||||||
info "Initializing networking", addrs=announcedAddresses
|
|
||||||
|
|
||||||
var switch = newWakuSwitch(some(nodekey),
|
|
||||||
hostAddress,
|
|
||||||
wsHostAddress,
|
|
||||||
transportFlags = {ServerFlags.ReuseAddr},
|
|
||||||
rng = rng,
|
|
||||||
maxConnections = maxConnections,
|
|
||||||
wssEnabled = wssEnabled,
|
|
||||||
secureKeyPath = secureKey,
|
|
||||||
secureCertPath = secureCert,
|
|
||||||
nameResolver = nameResolver,
|
|
||||||
sendSignedPeerRecord = sendSignedPeerRecord)
|
|
||||||
|
|
||||||
let wakuNode = WakuNode(
|
|
||||||
peerManager: PeerManager.new(switch, peerStorage),
|
|
||||||
switch: switch,
|
|
||||||
rng: rng,
|
|
||||||
enr: enr,
|
|
||||||
filters: Filters.init(),
|
|
||||||
announcedAddresses: announcedAddresses
|
|
||||||
)
|
|
||||||
|
|
||||||
return wakuNode
|
|
||||||
|
|
||||||
proc peerInfo*(node: WakuNode): PeerInfo =
|
|
||||||
node.switch.peerInfo
|
|
||||||
|
|
||||||
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
|
||||||
# @TODO improved error handling
|
|
||||||
return
|
|
||||||
|
|
||||||
info "subscribe", topic=topic
|
|
||||||
|
|
||||||
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
|
||||||
# A default handler should be registered for all topics
|
|
||||||
trace "Hit default handler", topic=topic, data=data
|
|
||||||
|
|
||||||
let msg = WakuMessage.init(data)
|
|
||||||
if msg.isErr():
|
|
||||||
# TODO: Add metric to track waku message decode errors
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
# Notify mounted protocols of new message
|
|
||||||
if not node.wakuFilter.isNil():
|
|
||||||
await node.wakuFilter.handleMessage(topic, msg.value)
|
|
||||||
|
|
||||||
if not node.wakuStore.isNil():
|
|
||||||
node.wakuStore.handleMessage(topic, msg.value)
|
|
||||||
|
|
||||||
waku_node_messages.inc(labelValues = ["relay"])
|
|
||||||
|
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
|
||||||
|
|
||||||
if topic notin PubSub(wakuRelay).topics:
|
|
||||||
# Add default handler only for new topics
|
|
||||||
debug "Registering default handler", topic=topic
|
|
||||||
wakuRelay.subscribe(topic, defaultHandler)
|
|
||||||
|
|
||||||
if handler.isSome:
|
|
||||||
debug "Registering handler", topic=topic
|
|
||||||
wakuRelay.subscribe(topic, handler.get())
|
|
||||||
|
|
||||||
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
|
||||||
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
|
||||||
## this topic. TopicHandler is a method that takes a topic and some data.
|
|
||||||
##
|
|
||||||
## NOTE The data field SHOULD be decoded as a WakuMessage.
|
|
||||||
## Status: Implemented.
|
|
||||||
node.subscribe(topic, some(handler))
|
|
||||||
|
|
||||||
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
|
|
||||||
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
|
||||||
## FilterHandler is a method that takes a MessagePush.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
# Sanity check for well-formed subscribe FilterRequest
|
|
||||||
doAssert(request.subscribe, "invalid subscribe request")
|
|
||||||
|
|
||||||
info "subscribe content", filter=request
|
|
||||||
|
|
||||||
var id = generateRequestId(node.rng)
|
|
||||||
|
|
||||||
if node.wakuFilter.isNil == false:
|
|
||||||
let
|
|
||||||
pubsubTopic = request.pubsubTopic
|
|
||||||
contentTopics = request.contentFilters.mapIt(it.contentTopic)
|
|
||||||
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
|
|
||||||
|
|
||||||
if resSubscription.isOk():
|
|
||||||
id = resSubscription.get()
|
|
||||||
else:
|
|
||||||
# Failed to subscribe
|
|
||||||
error "remote subscription to filter failed", filter = request
|
|
||||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
|
||||||
|
|
||||||
# Register handler for filter, whether remote subscription succeeded or not
|
|
||||||
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
|
|
||||||
waku_node_filters.set(node.filters.len.int64)
|
|
||||||
|
|
||||||
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
|
||||||
## Unsubscribes a handler from a PubSub topic.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
|
||||||
# @TODO improved error handling
|
|
||||||
return
|
|
||||||
|
|
||||||
info "unsubscribe", topic=topic
|
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
|
||||||
wakuRelay.unsubscribe(@[(topic, handler)])
|
|
||||||
|
|
||||||
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
|
|
||||||
## Unsubscribes all handlers registered on a specific PubSub topic.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
|
|
||||||
# @TODO improved error handling
|
|
||||||
return
|
|
||||||
|
|
||||||
info "unsubscribeAll", topic=topic
|
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
|
||||||
wakuRelay.unsubscribeAll(topic)
|
|
||||||
|
|
||||||
|
|
||||||
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
|
|
||||||
## Unsubscribe from a content filter.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
# Sanity check for well-formed unsubscribe FilterRequest
|
|
||||||
doAssert(request.subscribe == false, "invalid unsubscribe request")
|
|
||||||
|
|
||||||
info "unsubscribe content", filter=request
|
|
||||||
|
|
||||||
let
|
|
||||||
pubsubTopic = request.pubsubTopic
|
|
||||||
contentTopics = request.contentFilters.mapIt(it.contentTopic)
|
|
||||||
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
|
|
||||||
node.filters.removeContentFilters(request.contentFilters)
|
|
||||||
|
|
||||||
waku_node_filters.set(node.filters.len.int64)
|
|
||||||
|
|
||||||
|
|
||||||
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
|
||||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
|
||||||
## `contentTopic` field for light node functionality. This field may be also
|
|
||||||
## be omitted.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
|
|
||||||
# @TODO improved error handling
|
|
||||||
return
|
|
||||||
|
|
||||||
let wakuRelay = node.wakuRelay
|
|
||||||
trace "publish", topic=topic, contentTopic=message.contentTopic
|
|
||||||
var publishingMessage = message
|
|
||||||
|
|
||||||
let data = message.encode().buffer
|
|
||||||
|
|
||||||
discard await wakuRelay.publish(topic, data)
|
|
||||||
|
|
||||||
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
|
|
||||||
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
|
|
||||||
## Returns whether relaying was successful or not.
|
|
||||||
## `WakuMessage` should contain a `contentTopic` field for light node
|
|
||||||
## functionality.
|
|
||||||
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
|
|
||||||
|
|
||||||
let rpc = PushRequest(pubSubTopic: topic, message: message)
|
|
||||||
return await node.wakuLightPush.request(rpc)
|
|
||||||
|
|
||||||
proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
|
||||||
discard await node.lightpush(topic, message)
|
|
||||||
|
|
||||||
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
|
||||||
## Queries known nodes for historical messages
|
|
||||||
|
|
||||||
# TODO: Once waku swap is less experimental, this can simplified
|
|
||||||
if node.wakuSwap.isNil:
|
|
||||||
debug "Using default query"
|
|
||||||
return await node.wakuStore.query(query)
|
|
||||||
else:
|
|
||||||
debug "Using SWAP accounting query"
|
|
||||||
# TODO: wakuSwap now part of wakuStore object
|
|
||||||
return await node.wakuStore.queryWithAccounting(query)
|
|
||||||
|
|
||||||
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
|
|
||||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
|
||||||
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
|
|
||||||
## messages are stored in the the wakuStore's messages field and in the message db
|
|
||||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
|
||||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
|
||||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
|
||||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
|
||||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
|
||||||
if node.wakuStore.isNil():
|
|
||||||
return
|
|
||||||
|
|
||||||
let retrievedMessages = await node.wakuStore.resume(peerList)
|
|
||||||
if retrievedMessages.isErr():
|
|
||||||
error "failed to resume store", error=retrievedMessages.error
|
|
||||||
return
|
|
||||||
|
|
||||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
|
||||||
|
|
||||||
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
|
||||||
proc info*(node: WakuNode): WakuInfo =
|
|
||||||
## Returns information about the Node, such as what multiaddress it can be reached at.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
##
|
|
||||||
|
|
||||||
let peerInfo = node.switch.peerInfo
|
|
||||||
|
|
||||||
var listenStr : seq[string]
|
|
||||||
for address in node.announcedAddresses:
|
|
||||||
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
|
|
||||||
listenStr &= fulladdr
|
|
||||||
let enrUri = node.enr.toUri()
|
|
||||||
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
|
|
||||||
return wakuInfo
|
|
||||||
|
|
||||||
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
|
|
||||||
info "mounting filter"
|
|
||||||
proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} =
|
|
||||||
|
|
||||||
info "push received"
|
|
||||||
for message in msg.messages:
|
|
||||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
|
||||||
|
|
||||||
if not node.wakuStore.isNil and (requestId in node.filters):
|
|
||||||
let pubSubTopic = node.filters[requestId].pubSubTopic
|
|
||||||
node.wakuStore.handleMessage(pubSubTopic, message)
|
|
||||||
|
|
||||||
waku_node_messages.inc(labelValues = ["filter"])
|
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start filter too.
|
|
||||||
await node.wakuFilter.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
|
|
||||||
|
|
||||||
|
|
||||||
# NOTE: If using the swap protocol, it must be mounted before store. This is
|
|
||||||
# because store is using a reference to the swap protocol.
|
|
||||||
proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.async, raises: [Defect, LPError].} =
|
|
||||||
info "mounting swap", mode = $swapConfig.mode
|
|
||||||
|
|
||||||
node.wakuSwap = WakuSwap.init(node.peerManager, node.rng, swapConfig)
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start swap too.
|
|
||||||
await node.wakuSwap.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
|
|
||||||
|
|
||||||
|
|
||||||
const MessageStoreDefaultRetentionPolicyInterval = 30.minutes
|
|
||||||
|
|
||||||
proc executeMessageRetentionPolicy(node: WakuNode) =
|
|
||||||
if node.wakuStore.isNil():
|
|
||||||
return
|
|
||||||
|
|
||||||
if node.wakuStore.store.isNil():
|
|
||||||
return
|
|
||||||
|
|
||||||
debug "executing message retention policy"
|
|
||||||
|
|
||||||
node.wakuStore.executeMessageRetentionPolicy()
|
|
||||||
node.wakuStore.reportStoredMessagesMetric()
|
|
||||||
|
|
||||||
proc startMessageRetentionPolicyPeriodicTask(node: WakuNode, interval: Duration) =
|
|
||||||
if node.wakuStore.isNil():
|
|
||||||
return
|
|
||||||
|
|
||||||
if node.wakuStore.store.isNil():
|
|
||||||
return
|
|
||||||
|
|
||||||
# https://github.com/nim-lang/Nim/issues/17369
|
|
||||||
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
|
||||||
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
|
|
||||||
executeMessageRetentionPolicy(node)
|
|
||||||
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
|
||||||
|
|
||||||
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
|
|
||||||
|
|
||||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
|
|
||||||
if node.wakuSwap.isNil():
|
|
||||||
info "mounting waku store protocol (no waku swap)"
|
|
||||||
else:
|
|
||||||
info "mounting waku store protocol with waku swap support"
|
|
||||||
|
|
||||||
node.wakuStore = WakuStore.init(
|
|
||||||
node.peerManager,
|
|
||||||
node.rng,
|
|
||||||
store,
|
|
||||||
wakuSwap=node.wakuSwap,
|
|
||||||
retentionPolicy=retentionPolicy
|
|
||||||
)
|
|
||||||
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start store too.
|
|
||||||
await node.wakuStore.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
|
|
||||||
|
|
||||||
|
|
||||||
proc startRelay*(node: WakuNode) {.async.} =
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
trace "Failed to start relay. Not mounted."
|
|
||||||
return
|
|
||||||
|
|
||||||
## Setup and start relay protocol
|
|
||||||
info "starting relay"
|
|
||||||
|
|
||||||
# Topic subscriptions
|
|
||||||
for topic in node.wakuRelay.defaultTopics:
|
|
||||||
node.subscribe(topic, none(TopicHandler))
|
|
||||||
|
|
||||||
# Resume previous relay connections
|
|
||||||
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
|
|
||||||
info "Found previous WakuRelay peers. Reconnecting."
|
|
||||||
|
|
||||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
|
||||||
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
|
||||||
|
|
||||||
await node.peerManager.reconnectPeers(WakuRelayCodec,
|
|
||||||
protocolMatcher(WakuRelayCodec),
|
|
||||||
backoffPeriod)
|
|
||||||
|
|
||||||
# Start the WakuRelay protocol
|
|
||||||
await node.wakuRelay.start()
|
|
||||||
|
|
||||||
info "relay started successfully"
|
|
||||||
|
|
||||||
proc mountRelay*(node: WakuNode,
|
|
||||||
topics: seq[string] = newSeq[string](),
|
|
||||||
triggerSelf = true,
|
|
||||||
peerExchangeHandler = none(RoutingRecordsHandler))
|
|
||||||
# @TODO: Better error handling: CatchableError is raised by `waitFor`
|
|
||||||
{.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
|
|
||||||
|
|
||||||
proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] =
|
|
||||||
let mh = MultiHash.digest("sha2-256", m.data)
|
|
||||||
if mh.isOk():
|
|
||||||
return ok(mh[].data.buffer)
|
|
||||||
else:
|
|
||||||
return ok(($m.data.hash).toBytes())
|
|
||||||
|
|
||||||
let wakuRelay = WakuRelay.init(
|
|
||||||
switch = node.switch,
|
|
||||||
msgIdProvider = msgIdProvider,
|
|
||||||
triggerSelf = triggerSelf,
|
|
||||||
sign = false,
|
|
||||||
verifySignature = false,
|
|
||||||
maxMessageSize = MaxWakuMessageSize
|
|
||||||
)
|
|
||||||
|
|
||||||
info "mounting relay"
|
|
||||||
|
|
||||||
## The default relay topics is the union of
|
|
||||||
## all configured topics plus the hard-coded defaultTopic(s)
|
|
||||||
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
|
|
||||||
|
|
||||||
## Add peer exchange handler
|
|
||||||
if peerExchangeHandler.isSome():
|
|
||||||
wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p
|
|
||||||
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
|
||||||
|
|
||||||
node.wakuRelay = wakuRelay
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start relay too.
|
|
||||||
await node.startRelay()
|
|
||||||
|
|
||||||
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
|
|
||||||
|
|
||||||
info "relay mounted successfully"
|
|
||||||
|
|
||||||
proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
|
||||||
info "mounting light push"
|
|
||||||
|
|
||||||
if node.wakuRelay.isNil:
|
|
||||||
debug "mounting lightpush without relay"
|
|
||||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
|
|
||||||
else:
|
|
||||||
debug "mounting lightpush with relay"
|
|
||||||
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
|
|
||||||
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start lightpush too.
|
|
||||||
await node.wakuLightPush.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
|
||||||
|
|
||||||
proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
|
||||||
info "mounting waku peer exchange"
|
|
||||||
|
|
||||||
var discv5Opt: Option[WakuDiscoveryV5]
|
|
||||||
if not node.wakuDiscV5.isNil():
|
|
||||||
discv5Opt = some(node.wakuDiscV5)
|
|
||||||
node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt)
|
|
||||||
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start Waku peer exchange too.
|
|
||||||
await node.wakuPeerExchange.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
|
|
||||||
|
|
||||||
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
|
||||||
info "mounting libp2p ping protocol"
|
|
||||||
|
|
||||||
try:
|
|
||||||
node.libp2pPing = Ping.new(rng = node.rng)
|
|
||||||
except Exception as e:
|
|
||||||
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
|
|
||||||
# @TODO: remove exception handling once explicit `raises` in ping module
|
|
||||||
raise newException(LPError, "Failed to initialize ping protocol")
|
|
||||||
|
|
||||||
if node.started:
|
|
||||||
# Node has started already. Let's start ping too.
|
|
||||||
await node.libp2pPing.start()
|
|
||||||
|
|
||||||
node.switch.mount(node.libp2pPing)
|
|
||||||
|
|
||||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
|
||||||
while node.started:
|
|
||||||
# Keep all connected peers alive while running
|
|
||||||
trace "Running keepalive"
|
|
||||||
|
|
||||||
# First get a list of connected peer infos
|
|
||||||
let peers = node.peerManager.peers()
|
|
||||||
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
|
|
||||||
.mapIt(it.toRemotePeerInfo())
|
|
||||||
|
|
||||||
# Attempt to retrieve and ping the active outgoing connection for each peer
|
|
||||||
for peer in peers:
|
|
||||||
let connOpt = await node.peerManager.dialPeer(peer, PingCodec)
|
|
||||||
|
|
||||||
if connOpt.isNone:
|
|
||||||
# @TODO more sophisticated error handling here
|
|
||||||
debug "failed to connect to remote peer", peer=peer
|
|
||||||
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
|
|
||||||
return
|
|
||||||
|
|
||||||
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
|
|
||||||
|
|
||||||
await sleepAsync(keepalive)
|
|
||||||
|
|
||||||
proc startKeepalive*(node: WakuNode) =
|
|
||||||
let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration
|
|
||||||
|
|
||||||
info "starting keepalive", keepalive=defaultKeepalive
|
|
||||||
|
|
||||||
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
|
||||||
|
|
||||||
proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) =
|
|
||||||
n.wakuStore.setPeer(peer)
|
|
||||||
|
|
||||||
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
|
||||||
info "Set store peer", address = address
|
|
||||||
|
|
||||||
let peer = parseRemotePeerInfo(address)
|
|
||||||
n.setStorePeer(peer)
|
|
||||||
|
|
||||||
proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) =
|
|
||||||
n.wakuFilter.setPeer(peer)
|
|
||||||
|
|
||||||
proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
|
||||||
info "Set filter peer", address = address
|
|
||||||
|
|
||||||
let peer = parseRemotePeerInfo(address)
|
|
||||||
n.setFilterPeer(peer)
|
|
||||||
|
|
||||||
proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) =
|
|
||||||
n.wakuLightPush.setPeer(peer)
|
|
||||||
|
|
||||||
proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
|
||||||
info "Set lightpush peer", address = address
|
|
||||||
|
|
||||||
let peer = parseRemotePeerInfo(address)
|
|
||||||
n.wakuLightPush.setPeer(peer)
|
|
||||||
|
|
||||||
proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
|
||||||
info "Set peer exchange peer", address = address
|
|
||||||
|
|
||||||
let remotePeer = parseRemotePeerInfo(address)
|
|
||||||
|
|
||||||
n.wakuPeerExchange.setPeer(remotePeer)
|
|
||||||
|
|
||||||
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
|
|
||||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
|
||||||
# NOTE This is dialing on WakuRelay protocol specifically
|
|
||||||
await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source)
|
|
||||||
|
|
||||||
|
|
||||||
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
|
||||||
## Continuously add newly discovered nodes
|
|
||||||
## using Node Discovery v5
|
|
||||||
if (node.wakuDiscv5.isNil):
|
|
||||||
warn "Trying to run discovery v5 while it's disabled"
|
|
||||||
return
|
|
||||||
|
|
||||||
info "Starting discovery loop"
|
|
||||||
|
|
||||||
while node.wakuDiscv5.listening:
|
|
||||||
trace "Running discovery loop"
|
|
||||||
## Query for a random target and collect all discovered nodes
|
|
||||||
## @TODO: we could filter nodes here
|
|
||||||
let discoveredPeers = await node.wakuDiscv5.findRandomPeers()
|
|
||||||
if discoveredPeers.isOk:
|
|
||||||
## Let's attempt to connect to peers we
|
|
||||||
## have not encountered before
|
|
||||||
|
|
||||||
trace "Discovered peers", count=discoveredPeers.get().len()
|
|
||||||
|
|
||||||
let newPeers = discoveredPeers.get().filterIt(
|
|
||||||
not node.switch.isConnected(it.peerId))
|
|
||||||
|
|
||||||
if newPeers.len > 0:
|
|
||||||
debug "Connecting to newly discovered peers", count=newPeers.len()
|
|
||||||
await connectToNodes(node, newPeers, "discv5")
|
|
||||||
|
|
||||||
# Discovery `queryRandom` can have a synchronous fast path for example
|
|
||||||
# when no peers are in the routing table. Don't run it in continuous loop.
|
|
||||||
#
|
|
||||||
# Also, give some time to dial the discovered nodes and update stats etc
|
|
||||||
await sleepAsync(5.seconds)
|
|
||||||
|
|
||||||
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
|
||||||
## Start Discovery v5 service
|
|
||||||
|
|
||||||
info "Starting discovery v5 service"
|
|
||||||
|
|
||||||
if not node.wakuDiscv5.isNil:
|
|
||||||
## First start listening on configured port
|
|
||||||
try:
|
|
||||||
trace "Start listening on discv5 port"
|
|
||||||
node.wakuDiscv5.open()
|
|
||||||
except CatchableError:
|
|
||||||
error "Failed to start discovery service. UDP port may be already in use"
|
|
||||||
return false
|
|
||||||
|
|
||||||
## Start Discovery v5
|
|
||||||
trace "Start discv5 service"
|
|
||||||
node.wakuDiscv5.start()
|
|
||||||
trace "Start discovering new peers using discv5"
|
|
||||||
|
|
||||||
asyncSpawn node.runDiscv5Loop()
|
|
||||||
|
|
||||||
debug "Successfully started discovery v5 service"
|
|
||||||
info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri()
|
|
||||||
return true
|
|
||||||
|
|
||||||
return false
|
|
||||||
|
|
||||||
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
|
|
||||||
## Stop Discovery v5 service
|
|
||||||
|
|
||||||
if not node.wakuDiscv5.isNil:
|
|
||||||
info "Stopping discovery v5 service"
|
|
||||||
|
|
||||||
## Stop Discovery v5 process and close listening port
|
|
||||||
if node.wakuDiscv5.listening:
|
|
||||||
trace "Stop listening on discv5 port"
|
|
||||||
await node.wakuDiscv5.closeWait()
|
|
||||||
|
|
||||||
debug "Successfully stopped discovery v5 service"
|
|
||||||
|
|
||||||
proc start*(node: WakuNode) {.async.} =
|
|
||||||
## Starts a created Waku Node and
|
|
||||||
## all its mounted protocols.
|
|
||||||
##
|
|
||||||
## Status: Implemented.
|
|
||||||
|
|
||||||
waku_version.set(1, labelValues=[git_version])
|
|
||||||
|
|
||||||
## NB: careful when moving this. We need to start the switch with the bind address
|
|
||||||
## BEFORE updating with announced addresses for the sake of identify.
|
|
||||||
await node.switch.start()
|
|
||||||
|
|
||||||
let peerInfo = node.switch.peerInfo
|
|
||||||
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
|
||||||
var listenStr = ""
|
|
||||||
for address in node.announcedAddresses:
|
|
||||||
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
|
|
||||||
listenStr &= fulladdr
|
|
||||||
|
|
||||||
## XXX: this should be /ip4..., / stripped?
|
|
||||||
info "Listening on", full = listenStr
|
|
||||||
info "DNS: discoverable ENR ", enr = node.enr.toUri()
|
|
||||||
|
|
||||||
# Perform relay-specific startup tasks TODO: this should be rethought
|
|
||||||
if not node.wakuRelay.isNil:
|
|
||||||
await node.startRelay()
|
|
||||||
|
|
||||||
## Update switch peer info with announced addrs
|
|
||||||
node.updateSwitchPeerInfo()
|
|
||||||
|
|
||||||
node.started = true
|
|
||||||
|
|
||||||
info "Node started successfully"
|
|
||||||
|
|
||||||
proc stop*(node: WakuNode) {.async.} =
|
|
||||||
if not node.wakuRelay.isNil:
|
|
||||||
await node.wakuRelay.stop()
|
|
||||||
|
|
||||||
if not node.wakuDiscv5.isNil:
|
|
||||||
discard await node.stopDiscv5()
|
|
||||||
|
|
||||||
await node.switch.stop()
|
|
||||||
|
|
||||||
node.started = false
|
|
||||||
|
|
||||||
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
@ -827,6 +52,10 @@ when isMainModule:
|
|||||||
./storage/message/dual_message_store,
|
./storage/message/dual_message_store,
|
||||||
./storage/message/sqlite_store,
|
./storage/message/sqlite_store,
|
||||||
./storage/peer/waku_peer_storage
|
./storage/peer/waku_peer_storage
|
||||||
|
|
||||||
|
when defined(rln) or defined(rlnzerokit):
|
||||||
|
import ../protocol/waku_rln_relay/waku_rln_relay_utils
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode.setup"
|
topics = "wakunode.setup"
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import
|
|||||||
metrics,
|
metrics,
|
||||||
metrics/chronos_httpserver,
|
metrics/chronos_httpserver,
|
||||||
./config,
|
./config,
|
||||||
./wakunode2,
|
./waku_node,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
../protocol/waku_filter,
|
../protocol/waku_filter,
|
||||||
../protocol/waku_store,
|
../protocol/waku_store,
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import
|
|||||||
presto
|
presto
|
||||||
import
|
import
|
||||||
./config,
|
./config,
|
||||||
./wakunode2,
|
./waku_node,
|
||||||
./rest/server,
|
./rest/server,
|
||||||
./rest/debug/debug_api,
|
./rest/debug/debug_api,
|
||||||
./rest/relay/[relay_api, topic_cache]
|
./rest/relay/[relay_api, topic_cache]
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import
|
|||||||
import
|
import
|
||||||
./config,
|
./config,
|
||||||
../protocol/waku_message,
|
../protocol/waku_message,
|
||||||
./wakunode2,
|
./waku_node,
|
||||||
./jsonrpc/[admin_api,
|
./jsonrpc/[admin_api,
|
||||||
debug_api,
|
debug_api,
|
||||||
filter_api,
|
filter_api,
|
||||||
|
|||||||
@ -1,50 +0,0 @@
|
|||||||
import
|
|
||||||
bearssl/rand,
|
|
||||||
eth/p2p/discoveryv5/enr,
|
|
||||||
libp2p/crypto/crypto,
|
|
||||||
libp2p/protocols/ping,
|
|
||||||
../protocol/waku_relay,
|
|
||||||
../protocol/waku_store,
|
|
||||||
../protocol/waku_swap/waku_swap,
|
|
||||||
../protocol/waku_filter,
|
|
||||||
../protocol/waku_lightpush,
|
|
||||||
../protocol/waku_peer_exchange,
|
|
||||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
|
||||||
./peer_manager/peer_manager,
|
|
||||||
./discv5/waku_discv5
|
|
||||||
|
|
||||||
|
|
||||||
# key and crypto modules different
|
|
||||||
type
|
|
||||||
KeyPair* = crypto.KeyPair
|
|
||||||
PublicKey* = crypto.PublicKey
|
|
||||||
PrivateKey* = crypto.PrivateKey
|
|
||||||
|
|
||||||
# XXX: Weird type, should probably be using pubsub Topic object name?
|
|
||||||
Topic* = string
|
|
||||||
Message* = seq[byte]
|
|
||||||
|
|
||||||
WakuInfo* = object
|
|
||||||
# NOTE One for simplicity, can extend later as needed
|
|
||||||
listenAddresses*: seq[string]
|
|
||||||
enrUri*: string
|
|
||||||
#multiaddrStrings*: seq[string]
|
|
||||||
|
|
||||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
|
||||||
WakuNode* = ref object of RootObj
|
|
||||||
peerManager*: PeerManager
|
|
||||||
switch*: Switch
|
|
||||||
wakuRelay*: WakuRelay
|
|
||||||
wakuStore*: WakuStore
|
|
||||||
wakuFilter*: WakuFilter
|
|
||||||
wakuSwap*: WakuSwap
|
|
||||||
wakuRlnRelay*: WakuRLNRelay
|
|
||||||
wakuLightPush*: WakuLightPush
|
|
||||||
wakuPeerExchange*: WakuPeerExchange
|
|
||||||
enr*: enr.Record
|
|
||||||
libp2pPing*: Ping
|
|
||||||
filters*: Filters
|
|
||||||
rng*: ref rand.HmacDrbgContext
|
|
||||||
wakuDiscv5*: WakuDiscoveryV5
|
|
||||||
announcedAddresses* : seq[MultiAddress]
|
|
||||||
started*: bool # Indicates that node has started listening
|
|
||||||
@ -16,8 +16,9 @@ import
|
|||||||
waku_rln_relay_types,
|
waku_rln_relay_types,
|
||||||
waku_rln_relay_metrics,
|
waku_rln_relay_metrics,
|
||||||
../../utils/time,
|
../../utils/time,
|
||||||
../../node/[wakunode2_types,config],
|
../../node/waku_node,
|
||||||
../../../../../apps/chat2/config_chat2,
|
../../node/config, ## TODO: Decouple the protocol code from the app configuration
|
||||||
|
../../../../../apps/chat2/config_chat2, ## TODO: Decouple the protocol code from the app configuration
|
||||||
../waku_message
|
../waku_message
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
|
|||||||
@ -5,6 +5,6 @@
|
|||||||
# - APACHEv2 ([LICENSE-APACHEv2](../LICENSE-APACHEv2) or https://www.apache.org/licenses/LICENSE-2.0)
|
# - APACHEv2 ([LICENSE-APACHEv2](../LICENSE-APACHEv2) or https://www.apache.org/licenses/LICENSE-2.0)
|
||||||
|
|
||||||
## An implementation of the [Waku v1](https://specs.vac.dev/specs/waku/waku.html) and [Waku v2](https://specs.vac.dev/specs/waku/v2/waku-v2.html) in nim.
|
## An implementation of the [Waku v1](https://specs.vac.dev/specs/waku/waku.html) and [Waku v2](https://specs.vac.dev/specs/waku/v2/waku-v2.html) in nim.
|
||||||
import v2/node/wakunode2, v1/node/wakunode1
|
import v2/node/waku_node as wakunode2, v1/node/wakunode1
|
||||||
export wakunode2
|
export wakunode2
|
||||||
export wakunode1
|
export wakunode1
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user