2024-06-28 16:04:57 +05:30
{. push raises : [ ] . }
2022-10-18 09:05:53 -05:00
import
2024-05-16 22:29:11 +02:00
std / [ hashes , options , sugar , tables , strutils , sequtils , os , net ] ,
2024-03-16 00:08:47 +01:00
chronos ,
chronicles ,
metrics ,
2024-07-09 13:14:28 +02:00
results ,
2022-10-18 09:05:53 -05:00
stew / byteutils ,
eth / keys ,
nimcrypto ,
bearssl / rand ,
eth / p2p / discoveryv5 / enr ,
libp2p / crypto / crypto ,
libp2p / protocols / ping ,
2022-11-04 10:52:08 +01:00
libp2p / protocols / pubsub / gossipsub ,
libp2p / protocols / pubsub / rpc / messages ,
2023-01-11 10:57:49 +01:00
libp2p / protocols / connectivity / autonat / client ,
libp2p / protocols / connectivity / autonat / service ,
2023-06-01 21:43:10 +02:00
libp2p / protocols / rendezvous ,
2022-11-04 10:52:08 +01:00
libp2p / builders ,
2023-11-16 18:15:27 +02:00
libp2p / transports / transport ,
2022-11-04 10:52:08 +01:00
libp2p / transports / tcptransport ,
libp2p / transports / wstransport
2022-10-18 09:05:53 -05:00
import
2023-04-19 13:29:23 +02:00
.. / waku_core ,
2023-09-26 07:33:52 -04:00
.. / waku_core / topics / sharding ,
2023-04-18 15:22:10 +02:00
.. / waku_relay ,
.. / waku_archive ,
2024-07-12 12:19:12 -04:00
.. / waku_archive_legacy ,
2024-04-25 09:09:52 -04:00
.. / waku_store_legacy / protocol as legacy_store ,
.. / waku_store_legacy / client as legacy_store_client ,
.. / waku_store_legacy / common as legacy_store_common ,
.. / waku_store / protocol as store ,
2023-04-18 15:22:10 +02:00
.. / waku_store / client as store_client ,
2024-04-25 09:09:52 -04:00
.. / waku_store / common as store_common ,
2024-07-30 07:23:39 -04:00
.. / waku_store / resume ,
2023-04-18 15:22:10 +02:00
.. / waku_filter_v2 ,
2023-09-14 21:28:57 +02:00
.. / waku_filter_v2 / client as filter_client ,
2024-01-16 17:27:40 +01:00
.. / waku_filter_v2 / subscriptions as filter_subscriptions ,
2023-10-11 08:58:45 +02:00
.. / waku_metadata ,
2024-08-13 07:27:34 -04:00
.. / waku_sync ,
2023-04-18 15:22:10 +02:00
.. / waku_lightpush / client as lightpush_client ,
2024-01-30 07:28:21 -05:00
.. / waku_lightpush / common ,
.. / waku_lightpush / protocol ,
2024-04-26 12:42:47 +02:00
.. / waku_lightpush / self_req_handler ,
2024-06-13 21:10:00 +04:00
.. / waku_lightpush / callbacks ,
2023-04-18 15:22:10 +02:00
.. / waku_enr ,
.. / waku_peer_exchange ,
2023-09-11 12:02:31 +05:30
.. / waku_rln_relay ,
2023-04-05 09:46:13 +02:00
. / config ,
2024-04-15 15:28:35 +02:00
. / peer_manager ,
2024-10-28 09:17:46 +01:00
.. / common / rate_limit / setting ,
.. / discovery / autonat_service
2022-10-18 09:05:53 -05:00
declarePublicCounter waku_node_messages , " number of messages received " , [ " type " ]
2024-03-16 00:08:47 +01:00
declarePublicHistogram waku_histogram_message_size ,
" message size histogram in kB " ,
2024-10-29 15:36:21 +02:00
buckets = [
0 .0 , 1 .0 , 3 .0 , 5 .0 , 15 .0 , 50 .0 , 75 .0 , 100 .0 , 125 .0 , 150 .0 , 500 .0 , 700 .0 , 1000 .0 , Inf
]
2023-04-20 13:45:29 +02:00
2024-03-16 00:08:47 +01:00
declarePublicGauge waku_version ,
" Waku version info (in git describe format) " , [ " version " ]
2022-10-18 09:05:53 -05:00
declarePublicGauge waku_node_errors , " number of wakunode errors " , [ " type " ]
2022-10-28 16:30:01 +02:00
declarePublicGauge waku_lightpush_peers , " number of lightpush peers "
2022-11-02 11:59:58 +01:00
declarePublicGauge waku_filter_peers , " number of filter peers "
2022-10-28 20:11:28 +02:00
declarePublicGauge waku_store_peers , " number of store peers "
2024-03-16 00:08:47 +01:00
declarePublicGauge waku_px_peers ,
" number of peers (in the node ' s peerManager) supporting the peer exchange protocol "
2022-10-28 16:30:01 +02:00
2022-10-18 09:05:53 -05:00
logScope :
2022-11-03 16:36:24 +01:00
topics = " waku node "
2022-10-18 09:05:53 -05:00
2023-04-05 09:46:13 +02:00
# TODO: Move to application instance (e.g., `WakuNode2`)
2022-10-18 09:05:53 -05:00
# Git version in git describe format (defined compile time)
const git_version * {. strdefine . } = " n/a "
# Default clientId
const clientId * = " Nimbus Waku v2 node "
2023-05-12 18:08:41 +02:00
const WakuNodeVersionString * = " version / git commit hash: " & git_version
2022-10-18 09:05:53 -05:00
# key and crypto modules different
type
2023-04-05 09:46:13 +02:00
# TODO: Move to application instance (e.g., `WakuNode2`)
2024-03-16 00:08:47 +01:00
WakuInfo * = object # NOTE One for simplicity, can extend later as needed
2022-10-18 09:05:53 -05:00
listenAddresses * : seq [ string ]
2024-03-16 00:08:47 +01:00
enrUri * : string #multiaddrStrings*: seq[string]
2022-10-18 09:05:53 -05:00
# NOTE based on Eth2Node in NBC eth2_network.nim
2022-10-27 17:29:09 +02:00
WakuNode * = ref object
2022-10-18 09:05:53 -05:00
peerManager * : PeerManager
switch * : Switch
wakuRelay * : WakuRelay
2024-07-12 12:19:12 -04:00
wakuArchive * : waku_archive . WakuArchive
wakuLegacyArchive * : waku_archive_legacy . WakuArchive
2024-04-25 09:09:52 -04:00
wakuLegacyStore * : legacy_store . WakuStore
wakuLegacyStoreClient * : legacy_store_client . WakuStoreClient
wakuStore * : store . WakuStore
wakuStoreClient * : store_client . WakuStoreClient
2024-07-30 07:23:39 -04:00
wakuStoreResume * : StoreResume
2023-04-11 10:12:54 +02:00
wakuFilter * : waku_filter_v2 . WakuFilter
2023-09-14 21:28:57 +02:00
wakuFilterClient * : filter_client . WakuFilterClient
2023-09-11 12:02:31 +05:30
wakuRlnRelay * : WakuRLNRelay
2022-10-18 09:05:53 -05:00
wakuLightPush * : WakuLightPush
2022-10-28 16:30:01 +02:00
wakuLightpushClient * : WakuLightPushClient
2022-10-18 09:05:53 -05:00
wakuPeerExchange * : WakuPeerExchange
2023-10-11 08:58:45 +02:00
wakuMetadata * : WakuMetadata
2024-03-13 10:58:13 +01:00
wakuSharding * : Sharding
2024-08-13 07:27:34 -04:00
wakuSync * : WakuSync
2022-10-18 09:05:53 -05:00
enr * : enr . Record
libp2pPing * : Ping
rng * : ref rand . HmacDrbgContext
2023-06-01 21:43:10 +02:00
rendezvous * : RendezVous
2024-03-16 00:08:47 +01:00
announcedAddresses * : seq [ MultiAddress ]
2022-10-18 09:05:53 -05:00
started * : bool # Indicates that node has started listening
2023-08-23 09:53:17 -04:00
topicSubscriptionQueue * : AsyncEventQueue [ SubscriptionEvent ]
2023-09-26 07:33:52 -04:00
contentTopicHandlers : Table [ ContentTopic , TopicHandler ]
2024-09-18 15:58:07 +02:00
rateLimitSettings * : ProtocolRateLimitSettings
2022-10-18 09:05:53 -05:00
2024-03-16 00:08:47 +01:00
proc new * (
T : type WakuNode ,
netConfig : NetConfig ,
enr : enr . Record ,
switch : Switch ,
peerManager : PeerManager ,
# TODO: make this argument required after tests are updated
rng : ref HmacDrbgContext = crypto . newRng ( ) ,
) : T {. raises : [ Defect , LPError , IOError , TLSStreamProtocolError ] . } =
2023-02-07 18:36:50 +05:30
## Creates a Waku Node instance.
2024-03-16 00:08:47 +01:00
info " Initializing networking " , addrs = $ netConfig . announcedAddresses
2022-10-18 09:05:53 -05:00
2024-03-25 10:33:01 -04:00
let queue = newAsyncEventQueue [ SubscriptionEvent ] ( 0 )
2023-08-23 09:53:17 -04:00
2023-10-11 08:58:45 +02:00
let node = WakuNode (
2023-06-23 15:30:28 +02:00
peerManager : peerManager ,
2022-10-18 09:05:53 -05:00
switch : switch ,
rng : rng ,
2023-06-22 16:58:14 -04:00
enr : enr ,
2023-02-07 18:36:50 +05:30
announcedAddresses : netConfig . announcedAddresses ,
2024-03-16 00:08:47 +01:00
topicSubscriptionQueue : queue ,
2024-09-18 15:58:07 +02:00
rateLimitSettings : DefaultProtocolRateLimit ,
2022-10-18 09:05:53 -05:00
)
2023-10-11 08:58:45 +02:00
return node
2022-11-21 09:36:41 +01:00
proc peerInfo * ( node : WakuNode ) : PeerInfo =
2022-10-18 09:05:53 -05:00
node . switch . peerInfo
2023-02-27 17:36:24 +02:00
proc peerId * ( node : WakuNode ) : PeerId =
node . peerInfo . peerId
2023-04-05 09:46:13 +02:00
# TODO: Move to application instance (e.g., `WakuNode2`)
2022-10-27 17:29:09 +02:00
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info * ( node : WakuNode ) : WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node . switch . peerInfo
2022-11-21 09:36:41 +01:00
2024-03-16 00:08:47 +01:00
var listenStr : seq [ string ]
2022-10-27 17:29:09 +02:00
for address in node . announcedAddresses :
var fulladdr = $ address & " /p2p/ " & $ peerInfo . peerId
listenStr & = fulladdr
let enrUri = node . enr . toUri ( )
let wakuInfo = WakuInfo ( listenAddresses : listenStr , enrUri : enrUri )
return wakuInfo
2024-03-16 00:08:47 +01:00
proc connectToNodes * (
node : WakuNode , nodes : seq [ RemotePeerInfo ] | seq [ string ] , source = " api "
) {. async . } =
2022-10-27 17:29:09 +02:00
## `source` indicates source of node addrs (static config, api call, discovery, etc)
2023-03-28 13:29:48 +02:00
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
2024-03-16 00:08:47 +01:00
await peer_manager . connectToNodes ( node . peerManager , nodes , source = source )
2022-10-27 17:29:09 +02:00
2024-09-24 18:20:29 +02:00
proc disconnectNode * ( node : WakuNode , remotePeer : RemotePeerInfo ) {. async . } =
await peer_manager . disconnectNode ( node . peerManager , remotePeer )
2024-08-13 07:27:34 -04:00
## Waku Sync
proc mountWakuSync * (
node : WakuNode ,
maxFrameSize : int = DefaultMaxFrameSize ,
syncRange : timer . Duration = DefaultSyncRange ,
syncInterval : timer . Duration = DefaultSyncInterval ,
relayJitter : Duration = DefaultGossipSubJitter ,
) : Future [ Result [ void , string ] ] {. async . } =
if not node . wakuSync . isNil ( ) :
return err ( " already mounted " )
node . wakuSync = (
await WakuSync . new (
peerManager = node . peerManager ,
maxFrameSize = maxFrameSize ,
syncRange = syncRange ,
syncInterval = syncInterval ,
relayJitter = relayJitter ,
wakuArchive = node . wakuArchive ,
wakuStoreClient = node . wakuStoreClient ,
)
) . valueOr :
return err ( " initialization failed: " & error )
let catchable = catch :
node . switch . mount ( node . wakuSync , protocolMatcher ( WakuSyncCodec ) )
if catchable . isErr ( ) :
return err ( " switch mounting failed: " & catchable . error . msg )
if node . started :
node . wakuSync . start ( )
return ok ( )
2023-11-21 15:15:39 -05:00
## Waku Metadata
proc mountMetadata * ( node : WakuNode , clusterId : uint32 ) : Result [ void , string ] =
if not node . wakuMetadata . isNil ( ) :
return err ( " Waku metadata already mounted, skipping " )
2024-01-16 17:27:40 +01:00
2023-11-21 15:15:39 -05:00
let metadata = WakuMetadata . new ( clusterId , node . enr , node . topicSubscriptionQueue )
node . wakuMetadata = metadata
node . peerManager . wakuMetadata = metadata
2024-03-16 00:08:47 +01:00
let catchRes = catch :
node . switch . mount ( node . wakuMetadata , protocolMatcher ( WakuMetadataCodec ) )
2023-11-21 15:15:39 -05:00
if catchRes . isErr ( ) :
return err ( catchRes . error . msg )
return ok ( )
2022-10-27 17:29:09 +02:00
2024-03-13 10:58:13 +01:00
## Waku Sharding
2024-03-16 00:08:47 +01:00
proc mountSharding * (
2024-06-05 15:32:35 +02:00
node : WakuNode , clusterId : uint16 , shardCount : uint32
2024-03-16 00:08:47 +01:00
) : Result [ void , string ] =
info " mounting sharding " , clusterId = clusterId , shardCount = shardCount
2024-03-13 10:58:13 +01:00
node . wakuSharding = Sharding ( clusterId : clusterId , shardCountGenZero : shardCount )
return ok ( )
2022-10-27 17:29:09 +02:00
## Waku relay
2023-02-08 16:09:59 +01:00
proc registerRelayDefaultHandler ( node : WakuNode , topic : PubsubTopic ) =
if node . wakuRelay . isSubscribed ( topic ) :
2022-10-18 09:05:53 -05:00
return
2024-06-17 13:14:05 +02:00
proc traceHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async , gcsafe . } =
let msg_hash = topic . computeMessageHash ( msg ) . to0xHex ( )
let msgSizeKB = msg . payload . len / 1000
waku_node_messages . inc ( labelValues = [ " relay " ] )
waku_histogram_message_size . observe ( msgSizeKB )
2023-02-08 16:09:59 +01:00
proc filterHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async , gcsafe . } =
if node . wakuFilter . isNil ( ) :
return
2022-11-21 09:36:41 +01:00
2023-02-08 16:09:59 +01:00
await node . wakuFilter . handleMessage ( topic , msg )
2022-10-18 09:05:53 -05:00
2023-02-08 16:09:59 +01:00
proc archiveHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async , gcsafe . } =
2024-07-12 12:19:12 -04:00
if not node . wakuLegacyArchive . isNil ( ) :
## we try to store with legacy archive
await node . wakuLegacyArchive . handleMessage ( topic , msg )
return
2023-02-08 16:09:59 +01:00
if node . wakuArchive . isNil ( ) :
return
2022-10-18 09:05:53 -05:00
2023-05-25 17:34:34 +02:00
await node . wakuArchive . handleMessage ( topic , msg )
2022-10-18 09:05:53 -05:00
2024-08-13 07:27:34 -04:00
proc syncHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async . } =
if node . wakuSync . isNil ( ) :
return
node . wakuSync . messageIngress ( topic , msg )
2024-03-16 00:08:47 +01:00
let defaultHandler = proc (
topic : PubsubTopic , msg : WakuMessage
) : Future [ void ] {. async , gcsafe . } =
2024-06-17 13:14:05 +02:00
await traceHandler ( topic , msg )
2023-06-06 19:28:47 +02:00
await filterHandler ( topic , msg )
await archiveHandler ( topic , msg )
2024-08-13 07:27:34 -04:00
await syncHandler ( topic , msg )
2023-02-08 16:09:59 +01:00
2023-09-26 07:33:52 -04:00
discard node . wakuRelay . subscribe ( topic , defaultHandler )
2022-10-18 09:05:53 -05:00
2024-03-16 00:08:47 +01:00
proc subscribe * (
node : WakuNode , subscription : SubscriptionEvent , handler = none ( WakuRelayHandler )
) =
2023-09-26 07:33:52 -04:00
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
2023-10-02 15:38:40 +02:00
2023-02-08 16:09:59 +01:00
if node . wakuRelay . isNil ( ) :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
return
2023-09-26 07:33:52 -04:00
let ( pubsubTopic , contentTopicOp ) =
2024-03-16 00:08:47 +01:00
case subscription . kind
of ContentSub :
let shard = node . wakuSharding . getShard ( ( subscription . topic ) ) . valueOr :
error " Autosharding error " , error = error
return
2023-09-26 07:33:52 -04:00
2024-09-10 15:07:12 -06:00
( $ shard , some ( subscription . topic ) )
2024-03-16 00:08:47 +01:00
of PubsubSub :
( subscription . topic , none ( ContentTopic ) )
else :
return
2023-10-02 15:38:40 +02:00
2023-09-26 07:33:52 -04:00
if contentTopicOp . isSome ( ) and node . contentTopicHandlers . hasKey ( contentTopicOp . get ( ) ) :
error " Invalid API call to `subscribe`. Was already subscribed "
2023-02-08 16:09:59 +01:00
return
2023-09-26 07:33:52 -04:00
node . topicSubscriptionQueue . emit ( ( kind : PubsubSub , topic : pubsubTopic ) )
node . registerRelayDefaultHandler ( pubsubTopic )
2022-10-18 09:05:53 -05:00
2023-09-26 07:33:52 -04:00
if handler . isSome ( ) :
let wrappedHandler = node . wakuRelay . subscribe ( pubsubTopic , handler . get ( ) )
2023-02-08 16:09:59 +01:00
2023-09-26 07:33:52 -04:00
if contentTopicOp . isSome ( ) :
node . contentTopicHandlers [ contentTopicOp . get ( ) ] = wrappedHandler
2022-11-21 09:36:41 +01:00
2023-09-26 07:33:52 -04:00
proc unsubscribe * ( node : WakuNode , subscription : SubscriptionEvent ) =
## Unsubscribes from a specific PubSub or Content topic.
2023-10-02 15:38:40 +02:00
2022-10-27 17:29:09 +02:00
if node . wakuRelay . isNil ( ) :
2023-06-06 19:28:47 +02:00
error " Invalid API call to `unsubscribe`. WakuRelay not mounted. "
2022-10-18 09:05:53 -05:00
return
2022-11-21 09:36:41 +01:00
2023-09-26 07:33:52 -04:00
let ( pubsubTopic , contentTopicOp ) =
2024-03-16 00:08:47 +01:00
case subscription . kind
of ContentUnsub :
let shard = node . wakuSharding . getShard ( ( subscription . topic ) ) . valueOr :
error " Autosharding error " , error = error
return
2022-10-18 09:05:53 -05:00
2024-09-10 15:07:12 -06:00
( $ shard , some ( subscription . topic ) )
2024-03-16 00:08:47 +01:00
of PubsubUnsub :
( subscription . topic , none ( ContentTopic ) )
else :
return
2023-02-08 16:09:59 +01:00
2023-09-26 07:33:52 -04:00
if not node . wakuRelay . isSubscribed ( pubsubTopic ) :
error " Invalid API call to `unsubscribe`. Was not subscribed "
return
2022-11-21 09:36:41 +01:00
2023-09-26 07:33:52 -04:00
if contentTopicOp . isSome ( ) :
# Remove this handler only
var handler : TopicHandler
if node . contentTopicHandlers . pop ( contentTopicOp . get ( ) , handler ) :
2024-03-16 00:08:47 +01:00
debug " unsubscribe " , contentTopic = contentTopicOp . get ( )
2023-09-26 07:33:52 -04:00
node . wakuRelay . unsubscribe ( pubsubTopic , handler )
if contentTopicOp . isNone ( ) or node . wakuRelay . topics . getOrDefault ( pubsubTopic ) . len = = 1 :
# Remove all handlers
2024-03-16 00:08:47 +01:00
debug " unsubscribe " , pubsubTopic = pubsubTopic
2023-09-26 07:33:52 -04:00
node . wakuRelay . unsubscribeAll ( pubsubTopic )
node . topicSubscriptionQueue . emit ( ( kind : PubsubUnsub , topic : pubsubTopic ) )
proc publish * (
2024-03-16 00:08:47 +01:00
node : WakuNode , pubsubTopicOp : Option [ PubsubTopic ] , message : WakuMessage
) : Future [ Result [ void , string ] ] {. async , gcsafe . } =
2023-09-26 07:33:52 -04:00
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
## It is also used to determine the shard.
2022-11-21 09:36:41 +01:00
2022-10-25 14:55:31 +02:00
if node . wakuRelay . isNil ( ) :
2024-03-16 00:08:47 +01:00
let msg =
" Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead. "
2024-08-14 16:40:08 +02:00
error " publish error " , err = msg
2022-10-25 14:55:31 +02:00
# TODO: Improve error handling
2024-01-18 13:49:13 +01:00
return err ( msg )
2022-10-18 09:05:53 -05:00
2023-09-26 07:33:52 -04:00
let pubsubTopic = pubsubTopicOp . valueOr :
2024-03-13 10:58:13 +01:00
node . wakuSharding . getShard ( message . contentTopic ) . valueOr :
2024-01-18 13:49:13 +01:00
let msg = " Autosharding error: " & error
2024-08-14 16:40:08 +02:00
error " publish error " , err = msg
2024-01-18 13:49:13 +01:00
return err ( msg )
2023-09-26 07:33:52 -04:00
#TODO instead of discard return error when 0 peers received the message
discard await node . wakuRelay . publish ( pubsubTopic , message )
2022-10-18 09:05:53 -05:00
2024-06-10 15:56:55 +02:00
notice " waku.relay published " ,
2024-03-16 00:08:47 +01:00
peerId = node . peerId ,
pubsubTopic = pubsubTopic ,
2024-06-10 15:56:55 +02:00
msg_hash = pubsubTopic . computeMessageHash ( message ) . to0xHex ( ) ,
2024-03-16 00:08:47 +01:00
publishTime = getNowInNanosecondTime ( )
2024-01-18 13:49:13 +01:00
return ok ( )
2023-02-03 10:06:21 +02:00
2022-10-27 17:29:09 +02:00
proc startRelay * ( node : WakuNode ) {. async . } =
2022-11-18 14:50:56 +01:00
## Setup and start relay protocol
info " starting relay protocol "
2022-10-27 17:29:09 +02:00
if node . wakuRelay . isNil ( ) :
2023-02-08 16:09:59 +01:00
error " Failed to start relay. Not mounted. "
2022-10-27 17:29:09 +02:00
return
2022-10-18 09:05:53 -05:00
2022-11-18 14:50:56 +01:00
## Setup relay protocol
2022-11-21 09:36:41 +01:00
2022-10-27 17:29:09 +02:00
# Resume previous relay connections
2024-09-27 18:16:46 +05:30
if node . peerManager . wakuPeerStore . hasPeers ( protocolMatcher ( WakuRelayCodec ) ) :
2022-10-27 17:29:09 +02:00
info " Found previous WakuRelay peers. Reconnecting. "
2022-11-21 09:36:41 +01:00
2022-10-27 17:29:09 +02:00
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
2024-03-16 00:08:47 +01:00
let backoffPeriod =
node . wakuRelay . parameters . pruneBackoff + chronos . seconds ( BackoffSlackTime )
2022-10-18 09:05:53 -05:00
2024-03-16 00:08:47 +01:00
await node . peerManager . reconnectPeers ( WakuRelayCodec , backoffPeriod )
2022-11-21 09:36:41 +01:00
2022-10-27 17:29:09 +02:00
# Start the WakuRelay protocol
await node . wakuRelay . start ( )
2022-10-18 09:05:53 -05:00
2022-10-27 17:29:09 +02:00
info " relay started successfully "
2022-10-18 09:05:53 -05:00
2024-03-16 00:08:47 +01:00
proc mountRelay * (
node : WakuNode ,
2024-09-10 15:07:12 -06:00
shards : seq [ RelayShard ] = @ [ ] ,
2024-03-16 00:08:47 +01:00
peerExchangeHandler = none ( RoutingRecordsHandler ) ,
2024-04-20 09:10:52 +05:30
maxMessageSize = int ( DefaultMaxWakuMessageSize ) ,
2024-03-16 00:08:47 +01:00
) {. async , gcsafe . } =
2023-04-05 15:12:49 +02:00
if not node . wakuRelay . isNil ( ) :
error " wakuRelay already mounted, skipping "
return
2022-11-18 14:50:56 +01:00
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info " mounting relay protocol "
2022-11-21 09:36:41 +01:00
2024-01-03 13:11:50 +01:00
let initRes = WakuRelay . new ( node . switch , maxMessageSize )
2022-11-18 14:50:56 +01:00
if initRes . isErr ( ) :
2024-03-16 00:08:47 +01:00
error " failed mounting relay protocol " , error = initRes . error
2022-11-18 14:50:56 +01:00
return
2022-10-18 09:05:53 -05:00
2022-11-18 14:50:56 +01:00
node . wakuRelay = initRes . value
2022-10-27 17:29:09 +02:00
## Add peer exchange handler
if peerExchangeHandler . isSome ( ) :
2024-03-16 00:08:47 +01:00
node . wakuRelay . parameters . enablePX = true
# Feature flag for peer exchange in nim-libp2p
2022-11-18 14:50:56 +01:00
node . wakuRelay . routingRecordsHandler . add ( peerExchangeHandler . get ( ) )
2022-10-27 17:29:09 +02:00
if node . started :
await node . startRelay ( )
2022-11-18 14:50:56 +01:00
node . switch . mount ( node . wakuRelay , protocolMatcher ( WakuRelayCodec ) )
2024-09-10 15:07:12 -06:00
info " relay mounted successfully " , shards = shards
2022-10-18 09:05:53 -05:00
2024-09-10 15:07:12 -06:00
# Subscribe to shards
for shard in shards :
node . subscribe ( ( kind : PubsubSub , topic : $ shard ) )
2023-02-10 15:17:50 +01:00
2022-10-27 17:29:09 +02:00
## Waku filter
2022-10-18 09:05:53 -05:00
2024-03-16 00:08:47 +01:00
proc mountFilter * (
node : WakuNode ,
subscriptionTimeout : Duration =
filter_subscriptions . DefaultSubscriptionTimeToLiveSec ,
maxFilterPeers : uint32 = filter_subscriptions . MaxFilterPeers ,
maxFilterCriteriaPerPeer : uint32 = filter_subscriptions . MaxFilterCriteriaPerPeer ,
2024-06-28 18:16:06 +05:30
messageCacheTTL : Duration = filter_subscriptions . MessageCacheTTL ,
2024-09-18 15:58:07 +02:00
rateLimitSetting : RateLimitSetting = FilterDefaultPerPeerRateLimit ,
2024-05-16 22:29:11 +02:00
) {. async : ( raises : [ ] ) . } =
2024-01-16 17:27:40 +01:00
## Mounting filter v2 protocol
2022-11-02 11:59:58 +01:00
info " mounting filter protocol "
2024-03-16 00:08:47 +01:00
node . wakuFilter = WakuFilter . new (
2024-07-16 15:46:21 +02:00
node . peerManager ,
subscriptionTimeout ,
maxFilterPeers ,
maxFilterCriteriaPerPeer ,
2024-06-28 18:16:06 +05:30
messageCacheTTL ,
2024-07-16 15:46:21 +02:00
some ( rateLimitSetting ) ,
2024-03-16 00:08:47 +01:00
)
2022-10-18 09:05:53 -05:00
if node . started :
2024-05-16 22:29:11 +02:00
try :
await node . wakuFilter . start ( )
except CatchableError :
error " failed to start wakuFilter " , error = getCurrentExceptionMsg ( )
2022-10-18 09:05:53 -05:00
2024-05-16 22:29:11 +02:00
try :
node . switch . mount ( node . wakuFilter , protocolMatcher ( WakuFilterSubscribeCodec ) )
except LPError :
error " failed to mount wakuFilter " , error = getCurrentExceptionMsg ( )
2023-09-14 21:28:57 +02:00
2024-03-16 00:08:47 +01:00
proc filterHandleMessage * (
node : WakuNode , pubsubTopic : PubsubTopic , message : WakuMessage
) {. async . } =
2024-03-25 19:07:56 +01:00
if node . wakuFilter . isNil ( ) :
2024-04-17 21:48:20 +02:00
error " cannot handle filter message " , error = " waku filter is required "
2022-10-27 17:29:09 +02:00
return
2024-03-25 19:07:56 +01:00
await node . wakuFilter . handleMessage ( pubsubTopic , message )
2022-10-27 17:29:09 +02:00
2024-05-16 22:29:11 +02:00
proc mountFilterClient * ( node : WakuNode ) {. async : ( raises : [ ] ) . } =
2024-03-25 19:07:56 +01:00
## Mounting both filter
2023-11-30 11:08:08 +01:00
## Giving option for application level to choose btw own push message handling or
2023-09-14 21:28:57 +02:00
## rely on node provided cache. - This only applies for v2 filter client
2022-11-02 11:59:58 +01:00
info " mounting filter client "
2023-09-14 21:28:57 +02:00
node . wakuFilterClient = WakuFilterClient . new ( node . peerManager , node . rng )
2022-11-02 11:59:58 +01:00
if node . started :
2024-05-16 22:29:11 +02:00
try :
await node . wakuFilterClient . start ( )
except CatchableError :
error " failed to start wakuFilterClient " , error = getCurrentExceptionMsg ( )
2022-11-02 11:59:58 +01:00
2024-05-16 22:29:11 +02:00
try :
node . switch . mount ( node . wakuFilterClient , protocolMatcher ( WakuFilterSubscribeCodec ) )
except LPError :
error " failed to mount wakuFilterClient " , error = getCurrentExceptionMsg ( )
2024-03-16 00:08:47 +01:00
proc filterSubscribe * (
node : WakuNode ,
pubsubTopic : Option [ PubsubTopic ] ,
contentTopics : ContentTopic | seq [ ContentTopic ] ,
peer : RemotePeerInfo | string ,
2024-05-16 22:29:11 +02:00
) : Future [ FilterSubscribeResult ] {. async : ( raises : [ ] ) . } =
2023-09-14 21:28:57 +02:00
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node . wakuFilterClient . isNil ( ) :
2024-03-16 00:08:47 +01:00
error " cannot register filter subscription to topic " ,
error = " waku filter client is not set up "
2023-09-14 21:28:57 +02:00
return err ( FilterSubscribeError . serviceUnavailable ( ) )
let remotePeerRes = parsePeerInfo ( peer )
if remotePeerRes . isErr ( ) :
error " Couldn ' t parse the peer info properly " , error = remotePeerRes . error
return err ( FilterSubscribeError . serviceUnavailable ( " No peers available " ) )
let remotePeer = remotePeerRes . value
if pubsubTopic . isSome ( ) :
2024-03-16 00:08:47 +01:00
info " registering filter subscription to content " ,
pubsubTopic = pubsubTopic . get ( ) ,
contentTopics = contentTopics ,
peer = remotePeer . peerId
2024-03-19 16:18:52 +01:00
when ( contentTopics is ContentTopic ) :
let contentTopics = @ [ contentTopics ]
2024-03-16 00:08:47 +01:00
let subRes = await node . wakuFilterClient . subscribe (
remotePeer , pubsubTopic . get ( ) , contentTopics
)
2023-09-14 21:28:57 +02:00
if subRes . isOk ( ) :
2024-03-16 00:08:47 +01:00
info " v2 subscribed to topic " ,
pubsubTopic = pubsubTopic , contentTopics = contentTopics
2024-03-01 08:01:37 -05:00
# Purpose is to update Waku Metadata
node . topicSubscriptionQueue . emit ( ( kind : PubsubSub , topic : pubsubTopic . get ( ) ) )
2023-09-14 21:28:57 +02:00
else :
2024-03-16 00:08:47 +01:00
error " failed filter v2 subscription " , error = subRes . error
2023-09-14 21:28:57 +02:00
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
return subRes
else :
2024-03-13 10:58:13 +01:00
let topicMapRes = node . wakuSharding . parseSharding ( pubsubTopic , contentTopics )
2023-09-14 21:28:57 +02:00
let topicMap =
if topicMapRes . isErr ( ) :
2024-03-16 00:08:47 +01:00
error " can ' t get shard " , error = topicMapRes . error
2023-09-14 21:28:57 +02:00
return err ( FilterSubscribeError . badResponse ( " can ' t get shard " ) )
2024-03-16 00:08:47 +01:00
else :
topicMapRes . get ( )
2023-09-14 21:28:57 +02:00
var futures = collect ( newSeq ) :
for pubsub , topics in topicMap . pairs :
2024-03-16 00:08:47 +01:00
info " registering filter subscription to content " ,
pubsubTopic = pubsub , contentTopics = topics , peer = remotePeer . peerId
2023-09-14 21:28:57 +02:00
let content = topics . mapIt ( $ it )
node . wakuFilterClient . subscribe ( remotePeer , $ pubsub , content )
var subRes : FilterSubscribeResult = FilterSubscribeResult . ok ( )
2024-05-16 22:29:11 +02:00
try :
let finished = await allFinished ( futures )
2023-08-17 08:11:18 -04:00
2024-05-16 22:29:11 +02:00
for fut in finished :
let res = fut . read ( )
2023-08-17 08:11:18 -04:00
2024-05-16 22:29:11 +02:00
if res . isErr ( ) :
error " failed filter subscription " , error = res . error
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
subRes = FilterSubscribeResult . err ( res . error )
2022-10-27 17:29:09 +02:00
2024-05-16 22:29:11 +02:00
for pubsub , topics in topicMap . pairs :
info " subscribed to topic " , pubsubTopic = pubsub , contentTopics = topics
# Purpose is to update Waku Metadata
node . topicSubscriptionQueue . emit ( ( kind : PubsubSub , topic : $ pubsub ) )
except CatchableError :
let errMsg = " exception in filterSubscribe: " & getCurrentExceptionMsg ( )
error " exception in filterSubscribe " , error = getCurrentExceptionMsg ( )
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
subRes =
FilterSubscribeResult . err ( FilterSubscribeError . serviceUnavailable ( errMsg ) )
2024-03-01 08:01:37 -05:00
2023-09-14 21:28:57 +02:00
# return the last error or ok
return subRes
2024-03-16 00:08:47 +01:00
proc filterUnsubscribe * (
node : WakuNode ,
pubsubTopic : Option [ PubsubTopic ] ,
2024-04-17 21:48:20 +02:00
contentTopics : ContentTopic | seq [ ContentTopic ] ,
2024-03-16 00:08:47 +01:00
peer : RemotePeerInfo | string ,
2024-05-16 22:29:11 +02:00
) : Future [ FilterSubscribeResult ] {. async : ( raises : [ ] ) . } =
2023-09-14 21:28:57 +02:00
## Unsubscribe from a content filter V2".
2022-11-21 09:36:41 +01:00
2023-09-14 21:28:57 +02:00
let remotePeerRes = parsePeerInfo ( peer )
if remotePeerRes . isErr ( ) :
error " couldn ' t parse remotePeerInfo " , error = remotePeerRes . error
return err ( FilterSubscribeError . serviceUnavailable ( " No peers available " ) )
let remotePeer = remotePeerRes . value
if pubsubTopic . isSome ( ) :
2024-03-16 00:08:47 +01:00
info " deregistering filter subscription to content " ,
pubsubTopic = pubsubTopic . get ( ) ,
contentTopics = contentTopics ,
peer = remotePeer . peerId
let unsubRes = await node . wakuFilterClient . unsubscribe (
remotePeer , pubsubTopic . get ( ) , contentTopics
)
2023-09-14 21:28:57 +02:00
if unsubRes . isOk ( ) :
2024-03-16 00:08:47 +01:00
info " unsubscribed from topic " ,
pubsubTopic = pubsubTopic . get ( ) , contentTopics = contentTopics
2024-03-01 08:01:37 -05:00
# Purpose is to update Waku Metadata
node . topicSubscriptionQueue . emit ( ( kind : PubsubUnsub , topic : pubsubTopic . get ( ) ) )
2023-09-14 21:28:57 +02:00
else :
2024-03-16 00:08:47 +01:00
error " failed filter unsubscription " , error = unsubRes . error
2023-09-14 21:28:57 +02:00
waku_node_errors . inc ( labelValues = [ " unsubscribe_filter_failure " ] )
return unsubRes
else : # pubsubTopic.isNone
2024-03-13 10:58:13 +01:00
let topicMapRes = node . wakuSharding . parseSharding ( pubsubTopic , contentTopics )
2023-09-14 21:28:57 +02:00
let topicMap =
if topicMapRes . isErr ( ) :
error " can ' t get shard " , error = topicMapRes . error
return err ( FilterSubscribeError . badResponse ( " can ' t get shard " ) )
2024-03-16 00:08:47 +01:00
else :
topicMapRes . get ( )
2022-11-21 09:36:41 +01:00
2023-09-14 21:28:57 +02:00
var futures = collect ( newSeq ) :
for pubsub , topics in topicMap . pairs :
2024-03-16 00:08:47 +01:00
info " deregistering filter subscription to content " ,
pubsubTopic = pubsub , contentTopics = topics , peer = remotePeer . peerId
2023-09-14 21:28:57 +02:00
let content = topics . mapIt ( $ it )
node . wakuFilterClient . unsubscribe ( remotePeer , $ pubsub , content )
2022-11-02 11:59:58 +01:00
2023-09-14 21:28:57 +02:00
var unsubRes : FilterSubscribeResult = FilterSubscribeResult . ok ( )
2024-05-16 22:29:11 +02:00
try :
let finished = await allFinished ( futures )
2023-09-14 21:28:57 +02:00
2024-05-16 22:29:11 +02:00
for fut in finished :
let res = fut . read ( )
2023-09-14 21:28:57 +02:00
2024-05-16 22:29:11 +02:00
if res . isErr ( ) :
error " failed filter unsubscription " , error = res . error
waku_node_errors . inc ( labelValues = [ " unsubscribe_filter_failure " ] )
unsubRes = FilterSubscribeResult . err ( res . error )
2023-09-14 21:28:57 +02:00
2024-05-16 22:29:11 +02:00
for pubsub , topics in topicMap . pairs :
info " unsubscribed from topic " , pubsubTopic = pubsub , contentTopics = topics
# Purpose is to update Waku Metadata
node . topicSubscriptionQueue . emit ( ( kind : PubsubUnsub , topic : $ pubsub ) )
except CatchableError :
let errMsg = " exception in filterUnsubscribe: " & getCurrentExceptionMsg ( )
error " exception in filterUnsubscribe " , error = getCurrentExceptionMsg ( )
waku_node_errors . inc ( labelValues = [ " unsubscribe_filter_failure " ] )
unsubRes =
FilterSubscribeResult . err ( FilterSubscribeError . serviceUnavailable ( errMsg ) )
2024-03-01 08:01:37 -05:00
2023-09-14 21:28:57 +02:00
# return the last error or ok
return unsubRes
2024-03-16 00:08:47 +01:00
proc filterUnsubscribeAll * (
node : WakuNode , peer : RemotePeerInfo | string
2024-05-16 22:29:11 +02:00
) : Future [ FilterSubscribeResult ] {. async : ( raises : [ ] ) . } =
2023-09-14 21:28:57 +02:00
## Unsubscribe from a content filter V2".
2022-11-21 09:36:41 +01:00
2023-09-14 21:28:57 +02:00
let remotePeerRes = parsePeerInfo ( peer )
if remotePeerRes . isErr ( ) :
error " couldn ' t parse remotePeerInfo " , error = remotePeerRes . error
return err ( FilterSubscribeError . serviceUnavailable ( " No peers available " ) )
let remotePeer = remotePeerRes . value
2024-03-16 00:08:47 +01:00
info " deregistering all filter subscription to content " , peer = remotePeer . peerId
2023-09-14 21:28:57 +02:00
let unsubRes = await node . wakuFilterClient . unsubscribeAll ( remotePeer )
if unsubRes . isOk ( ) :
2024-03-16 00:08:47 +01:00
info " unsubscribed from all content-topic " , peerId = remotePeer . peerId
2023-09-14 21:28:57 +02:00
else :
2024-03-16 00:08:47 +01:00
error " failed filter unsubscription from all content-topic " , error = unsubRes . error
2023-09-14 21:28:57 +02:00
waku_node_errors . inc ( labelValues = [ " unsubscribe_filter_failure " ] )
return unsubRes
2022-10-27 17:29:09 +02:00
2023-09-14 21:28:57 +02:00
# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated
# yet incompatible to handle both type of filters - use specific filter registration instead
2022-10-27 17:29:09 +02:00
2022-11-23 10:08:00 +01:00
## Waku archive
2024-03-16 00:08:47 +01:00
proc mountArchive * (
2024-07-12 12:19:12 -04:00
node : WakuNode ,
driver : waku_archive . ArchiveDriver ,
retentionPolicy = none ( waku_archive . RetentionPolicy ) ,
2024-03-16 00:08:47 +01:00
) : Result [ void , string ] =
2024-07-12 12:19:12 -04:00
node . wakuArchive = waku_archive . WakuArchive . new (
driver = driver , retentionPolicy = retentionPolicy
) . valueOr :
2024-03-12 07:51:03 -04:00
return err ( " error in mountArchive: " & error )
2022-10-18 09:05:53 -05:00
2024-03-12 07:51:03 -04:00
node . wakuArchive . start ( )
2022-10-18 09:05:53 -05:00
2023-06-27 13:24:31 +02:00
return ok ( )
2022-10-18 09:05:53 -05:00
2024-07-12 12:19:12 -04:00
proc mountLegacyArchive * (
2024-07-30 14:05:23 +02:00
node : WakuNode , driver : waku_archive_legacy . ArchiveDriver
2024-07-12 12:19:12 -04:00
) : Result [ void , string ] =
2024-07-30 14:05:23 +02:00
node . wakuLegacyArchive = waku_archive_legacy . WakuArchive . new ( driver = driver ) . valueOr :
2024-07-12 12:19:12 -04:00
return err ( " error in mountLegacyArchive: " & error )
return ok ( )
2024-04-25 09:09:52 -04:00
## Legacy Waku Store
2022-11-23 10:08:00 +01:00
# TODO: Review this mapping logic. Maybe, move it to the appplication code
2024-07-12 12:19:12 -04:00
proc toArchiveQuery (
request : legacy_store_common . HistoryQuery
) : waku_archive_legacy . ArchiveQuery =
waku_archive_legacy . ArchiveQuery (
2022-11-23 10:08:00 +01:00
pubsubTopic : request . pubsubTopic ,
contentTopics : request . contentTopics ,
2024-03-16 00:08:47 +01:00
cursor : request . cursor . map (
2024-07-12 12:19:12 -04:00
proc ( cursor : HistoryCursor ) : waku_archive_legacy . ArchiveCursor =
waku_archive_legacy . ArchiveCursor (
2024-03-16 00:08:47 +01:00
pubsubTopic : cursor . pubsubTopic ,
senderTime : cursor . senderTime ,
storeTime : cursor . storeTime ,
digest : cursor . digest ,
)
) ,
2022-11-23 10:08:00 +01:00
startTime : request . startTime ,
endTime : request . endTime ,
pageSize : request . pageSize . uint ,
2024-03-16 00:08:47 +01:00
direction : request . direction ,
2024-08-29 22:56:14 +02:00
requestId : request . requestId ,
2022-10-18 09:05:53 -05:00
)
2022-11-23 10:08:00 +01:00
# TODO: Review this mapping logic. Maybe, move it to the appplication code
2024-07-12 12:19:12 -04:00
proc toHistoryResult * (
res : waku_archive_legacy . ArchiveResult
) : legacy_store_common . HistoryResult =
2022-11-23 10:08:00 +01:00
if res . isErr ( ) :
let error = res . error
2024-03-16 00:08:47 +01:00
case res . error . kind
2024-07-12 12:19:12 -04:00
of waku_archive_legacy . ArchiveErrorKind . DRIVER_ERROR ,
waku_archive_legacy . ArchiveErrorKind . INVALID_QUERY :
2024-03-16 00:08:47 +01:00
err ( HistoryError ( kind : HistoryErrorKind . BAD_REQUEST , cause : res . error . cause ) )
2022-11-23 10:08:00 +01:00
else :
err ( HistoryError ( kind : HistoryErrorKind . UNKNOWN ) )
else :
let response = res . get ( )
2024-03-16 00:08:47 +01:00
ok (
HistoryResponse (
messages : response . messages ,
cursor : response . cursor . map (
2024-07-12 12:19:12 -04:00
proc ( cursor : waku_archive_legacy . ArchiveCursor ) : HistoryCursor =
2024-03-16 00:08:47 +01:00
HistoryCursor (
pubsubTopic : cursor . pubsubTopic ,
senderTime : cursor . senderTime ,
storeTime : cursor . storeTime ,
digest : cursor . digest ,
)
) ,
)
)
2022-11-23 10:08:00 +01:00
2024-04-25 09:09:52 -04:00
proc mountLegacyStore * (
2024-04-15 15:28:35 +02:00
node : WakuNode , rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit
2024-04-25 09:09:52 -04:00
) {. async . } =
info " mounting waku legacy store protocol "
2022-11-23 10:08:00 +01:00
2024-07-12 12:19:12 -04:00
if node . wakuLegacyArchive . isNil ( ) :
2024-04-25 09:09:52 -04:00
error " failed to mount waku legacy store protocol " , error = " waku archive not set "
2022-11-23 10:08:00 +01:00
return
# TODO: Review this handler logic. Maybe, move it to the appplication code
2024-03-16 00:08:47 +01:00
let queryHandler : HistoryQueryHandler = proc (
request : HistoryQuery
2024-04-25 09:09:52 -04:00
) : Future [ legacy_store_common . HistoryResult ] {. async . } =
2024-03-16 00:08:47 +01:00
if request . cursor . isSome ( ) :
request . cursor . get ( ) . checkHistCursor ( ) . isOkOr :
return err ( error )
2023-11-22 09:32:39 +01:00
2024-03-16 00:08:47 +01:00
let request = request . toArchiveQuery ( )
2024-07-12 12:19:12 -04:00
let response = await node . wakuLegacyArchive . findMessagesV2 ( request )
2024-03-16 00:08:47 +01:00
return response . toHistoryResult ( )
2022-11-23 10:08:00 +01:00
2024-04-25 09:09:52 -04:00
node . wakuLegacyStore = legacy_store . WakuStore . new (
node . peerManager , node . rng , queryHandler , some ( rateLimit )
)
2022-11-23 10:08:00 +01:00
2022-10-18 09:05:53 -05:00
if node . started :
# Node has started already. Let's start store too.
2024-04-25 09:09:52 -04:00
await node . wakuLegacyStore . start ( )
2022-10-18 09:05:53 -05:00
2024-04-25 09:09:52 -04:00
node . switch . mount (
2024-05-09 20:07:49 +02:00
node . wakuLegacyStore , protocolMatcher ( legacy_store_common . WakuLegacyStoreCodec )
2024-04-25 09:09:52 -04:00
)
2022-10-18 09:05:53 -05:00
2024-04-25 09:09:52 -04:00
proc mountLegacyStoreClient * ( node : WakuNode ) =
info " mounting legacy store client "
2022-11-02 11:59:58 +01:00
2024-04-25 09:09:52 -04:00
node . wakuLegacyStoreClient =
legacy_store_client . WakuStoreClient . new ( node . peerManager , node . rng )
2022-11-02 11:59:58 +01:00
2024-03-16 00:08:47 +01:00
proc query * (
2024-04-25 09:09:52 -04:00
node : WakuNode , query : legacy_store_common . HistoryQuery , peer : RemotePeerInfo
) : Future [ legacy_store_common . WakuStoreResult [ legacy_store_common . HistoryResponse ] ] {.
async , gcsafe
. } =
2022-10-28 20:11:28 +02:00
## Queries known nodes for historical messages
2024-04-25 09:09:52 -04:00
if node . wakuLegacyStoreClient . isNil ( ) :
return err ( " waku legacy store client is nil " )
2022-10-28 20:11:28 +02:00
2024-04-25 09:09:52 -04:00
let queryRes = await node . wakuLegacyStoreClient . query ( query , peer )
2022-10-28 20:11:28 +02:00
if queryRes . isErr ( ) :
2024-04-25 09:09:52 -04:00
return err ( " legacy store client query error: " & $ queryRes . error )
2022-11-21 09:36:41 +01:00
2022-10-28 20:11:28 +02:00
let response = queryRes . get ( )
return ok ( response )
# TODO: Move to application module (e.g., wakunode2.nim)
2024-03-16 00:08:47 +01:00
proc query * (
2024-04-25 09:09:52 -04:00
node : WakuNode , query : legacy_store_common . HistoryQuery
) : Future [ legacy_store_common . WakuStoreResult [ legacy_store_common . HistoryResponse ] ] {.
2024-03-16 00:08:47 +01:00
async , gcsafe , deprecated : " Use ' node.query() ' with peer destination instead "
. } =
2022-10-27 17:29:09 +02:00
## Queries known nodes for historical messages
2024-04-25 09:09:52 -04:00
if node . wakuLegacyStoreClient . isNil ( ) :
return err ( " waku legacy store client is nil " )
2022-10-18 09:05:53 -05:00
2024-05-09 20:07:49 +02:00
let peerOpt = node . peerManager . selectPeer ( legacy_store_common . WakuLegacyStoreCodec )
2022-10-28 20:11:28 +02:00
if peerOpt . isNone ( ) :
error " no suitable remote peers "
return err ( " peer_not_found_failure " )
return await node . query ( query , peerOpt . get ( ) )
2022-10-18 09:05:53 -05:00
2022-11-23 10:08:00 +01:00
when defined ( waku_exp_store_resume ) :
# TODO: Move to application module (e.g., wakunode2.nim)
2024-03-16 00:08:47 +01:00
proc resume * (
node : WakuNode , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] )
) {. async , gcsafe . } =
2022-11-23 10:08:00 +01:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
2023-11-30 11:08:08 +01:00
## messages are stored in the wakuStore's messages field and in the message db
2022-11-23 10:08:00 +01:00
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
2024-04-25 09:09:52 -04:00
if node . wakuLegacyStoreClient . isNil ( ) :
2022-11-23 10:08:00 +01:00
return
2022-10-18 09:05:53 -05:00
2024-04-25 09:09:52 -04:00
let retrievedMessages = await node . wakuLegacyStoreClient . resume ( peerList )
2022-11-23 10:08:00 +01:00
if retrievedMessages . isErr ( ) :
2024-03-16 00:08:47 +01:00
error " failed to resume store " , error = retrievedMessages . error
2022-11-23 10:08:00 +01:00
return
2022-11-21 09:36:41 +01:00
2024-03-16 00:08:47 +01:00
info " the number of retrieved messages since the last online time: " ,
number = retrievedMessages . value
2022-10-18 09:05:53 -05:00
2024-04-25 09:09:52 -04:00
## Waku Store
2024-07-12 12:19:12 -04:00
proc toArchiveQuery ( request : StoreQueryRequest ) : waku_archive . ArchiveQuery =
var query = waku_archive . ArchiveQuery ( )
2024-04-25 09:09:52 -04:00
2024-05-01 14:47:06 -04:00
query . includeData = request . includeData
2024-04-25 09:09:52 -04:00
query . pubsubTopic = request . pubsubTopic
query . contentTopics = request . contentTopics
query . startTime = request . startTime
query . endTime = request . endTime
query . hashes = request . messageHashes
2024-07-12 12:19:12 -04:00
query . cursor = request . paginationCursor
2024-04-25 09:09:52 -04:00
query . direction = request . paginationForward
2024-08-29 22:56:14 +02:00
query . requestId = request . requestId
2024-04-25 09:09:52 -04:00
if request . paginationLimit . isSome ( ) :
query . pageSize = uint ( request . paginationLimit . get ( ) )
return query
2024-07-12 12:19:12 -04:00
proc toStoreResult ( res : waku_archive . ArchiveResult ) : StoreQueryResult =
2024-04-25 09:09:52 -04:00
let response = res . valueOr :
return err ( StoreError . new ( 300 , " archive error: " & $ error ) )
var res = StoreQueryResponse ( )
res . statusCode = 200
2024-04-25 15:43:21 -04:00
res . statusDesc = " OK "
2024-05-01 14:47:06 -04:00
for i in 0 .. < response . hashes . len :
let hash = response . hashes [ i ]
2024-05-08 15:35:56 -04:00
let kv = store_common . WakuMessageKeyValue ( messageHash : hash )
2024-05-01 14:47:06 -04:00
res . messages . add ( kv )
for i in 0 .. < response . messages . len :
res . messages [ i ] . message = some ( response . messages [ i ] )
2024-05-08 15:35:56 -04:00
res . messages [ i ] . pubsubTopic = some ( response . topics [ i ] )
2024-04-25 09:09:52 -04:00
2024-07-12 12:19:12 -04:00
res . paginationCursor = response . cursor
2024-04-25 09:09:52 -04:00
return ok ( res )
proc mountStore * (
node : WakuNode , rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {. async . } =
if node . wakuArchive . isNil ( ) :
error " failed to mount waku store protocol " , error = " waku archive not set "
return
info " mounting waku store protocol "
let requestHandler : StoreQueryRequestHandler = proc (
request : StoreQueryRequest
) : Future [ StoreQueryResult ] {. async . } =
let request = request . toArchiveQuery ( )
let response = await node . wakuArchive . findMessages ( request )
return response . toStoreResult ( )
node . wakuStore =
store . WakuStore . new ( node . peerManager , node . rng , requestHandler , some ( rateLimit ) )
if node . started :
await node . wakuStore . start ( )
node . switch . mount ( node . wakuStore , protocolMatcher ( store_common . WakuStoreCodec ) )
proc mountStoreClient * ( node : WakuNode ) =
info " mounting store client "
node . wakuStoreClient = store_client . WakuStoreClient . new ( node . peerManager , node . rng )
proc query * (
node : WakuNode , request : store_common . StoreQueryRequest , peer : RemotePeerInfo
) : Future [ store_common . WakuStoreResult [ store_common . StoreQueryResponse ] ] {.
async , gcsafe
. } =
## Queries known nodes for historical messages
if node . wakuStoreClient . isNil ( ) :
return err ( " waku store v3 client is nil " )
let response = ( await node . wakuStoreClient . query ( request , peer ) ) . valueOr :
var res = StoreQueryResponse ( )
res . statusCode = uint32 ( error . kind )
res . statusDesc = $ error
return ok ( res )
return ok ( response )
2024-07-30 07:23:39 -04:00
proc setupStoreResume * ( node : WakuNode ) =
node . wakuStoreResume = StoreResume . new (
node . peerManager , node . wakuArchive , node . wakuStoreClient
) . valueOr :
error " Failed to setup Store Resume " , error = $ error
return
2022-10-27 17:29:09 +02:00
## Waku lightpush
2022-10-18 09:05:53 -05:00
2024-04-15 15:28:35 +02:00
proc mountLightPush * (
node : WakuNode , rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {. async . } =
2022-10-18 09:05:53 -05:00
info " mounting light push "
2024-06-17 13:14:05 +02:00
var pushHandler =
2024-06-13 21:10:00 +04:00
if node . wakuRelay . isNil :
debug " mounting lightpush without relay (nil) "
getNilPushHandler ( )
else :
debug " mounting lightpush with relay "
2024-06-17 13:14:05 +02:00
let rlnPeer =
2024-06-13 21:10:00 +04:00
if isNil ( node . wakuRlnRelay ) :
debug " mounting lightpush without rln-relay "
none ( WakuRLNRelay )
else :
debug " mounting lightpush with rln-relay "
some ( node . wakuRlnRelay )
getRelayPushHandler ( node . wakuRelay , rlnPeer )
2022-10-18 09:05:53 -05:00
2024-04-15 15:28:35 +02:00
node . wakuLightPush =
WakuLightPush . new ( node . peerManager , node . rng , pushHandler , some ( rateLimit ) )
2022-10-25 14:55:31 +02:00
2022-10-18 09:05:53 -05:00
if node . started :
# Node has started already. Let's start lightpush too.
await node . wakuLightPush . start ( )
node . switch . mount ( node . wakuLightPush , protocolMatcher ( WakuLightPushCodec ) )
2022-11-02 11:59:58 +01:00
proc mountLightPushClient * ( node : WakuNode ) =
info " mounting light push client "
node . wakuLightpushClient = WakuLightPushClient . new ( node . peerManager , node . rng )
2024-03-16 00:08:47 +01:00
proc lightpushPublish * (
node : WakuNode ,
pubsubTopic : Option [ PubsubTopic ] ,
message : WakuMessage ,
peer : RemotePeerInfo ,
) : Future [ WakuLightPushResult [ void ] ] {. async , gcsafe . } =
2022-10-27 17:29:09 +02:00
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
2024-04-26 12:42:47 +02:00
if node . wakuLightpushClient . isNil ( ) and node . wakuLightPush . isNil ( ) :
error " failed to publish message as lightpush not available "
return err ( " Waku lightpush not available " )
let internalPublish = proc (
node : WakuNode ,
pubsubTopic : PubsubTopic ,
message : WakuMessage ,
peer : RemotePeerInfo ,
) : Future [ WakuLightPushResult [ void ] ] {. async , gcsafe . } =
2024-05-01 10:25:33 +02:00
let msgHash = pubsubTopic . computeMessageHash ( message ) . to0xHex ( )
2024-04-26 12:42:47 +02:00
if not node . wakuLightpushClient . isNil ( ) :
2024-06-10 15:56:55 +02:00
notice " publishing message with lightpush " ,
2024-04-26 12:42:47 +02:00
pubsubTopic = pubsubTopic ,
contentTopic = message . contentTopic ,
2024-05-01 10:25:33 +02:00
target_peer_id = peer . peerId ,
msg_hash = msgHash
2024-04-26 12:42:47 +02:00
return await node . wakuLightpushClient . publish ( pubsubTopic , message , peer )
if not node . wakuLightPush . isNil ( ) :
2024-06-10 15:56:55 +02:00
notice " publishing message with self hosted lightpush " ,
2024-05-01 10:25:33 +02:00
pubsubTopic = pubsubTopic ,
contentTopic = message . contentTopic ,
target_peer_id = peer . peerId ,
msg_hash = msgHash
2024-04-26 12:42:47 +02:00
return await node . wakuLightPush . handleSelfLightPushRequest ( pubsubTopic , message )
2024-10-25 22:59:02 +02:00
try :
if pubsubTopic . isSome ( ) :
return await internalPublish ( node , pubsubTopic . get ( ) , message , peer )
2022-10-28 16:30:01 +02:00
2024-10-25 22:59:02 +02:00
let topicMapRes = node . wakuSharding . parseSharding ( pubsubTopic , message . contentTopic )
2023-08-17 08:11:18 -04:00
2024-10-25 22:59:02 +02:00
let topicMap =
if topicMapRes . isErr ( ) :
return err ( topicMapRes . error )
else :
topicMapRes . get ( )
2022-10-27 17:29:09 +02:00
2024-10-25 22:59:02 +02:00
for pubsub , _ in topicMap . pairs : # There's only one pair anyway
return await internalPublish ( node , $ pubsub , message , peer )
except CatchableError :
return err ( getCurrentExceptionMsg ( ) )
2022-10-28 16:30:01 +02:00
# TODO: Move to application module (e.g., wakunode2.nim)
2024-03-16 00:08:47 +01:00
proc lightpushPublish * (
node : WakuNode , pubsubTopic : Option [ PubsubTopic ] , message : WakuMessage
) : Future [ WakuLightPushResult [ void ] ] {.
async , gcsafe , deprecated : " Use ' node.lightpushPublish() ' instead "
. } =
2024-04-26 12:42:47 +02:00
if node . wakuLightpushClient . isNil ( ) and node . wakuLightPush . isNil ( ) :
error " failed to publish message as lightpush not available "
return err ( " waku lightpush not available " )
var peerOpt : Option [ RemotePeerInfo ] = none ( RemotePeerInfo )
if not node . wakuLightpushClient . isNil ( ) :
peerOpt = node . peerManager . selectPeer ( WakuLightPushCodec )
if peerOpt . isNone ( ) :
let msg = " no suitable remote peers "
2024-08-14 16:40:08 +02:00
error " failed to publish message " , err = msg
2024-04-26 12:42:47 +02:00
return err ( msg )
elif not node . wakuLightPush . isNil ( ) :
peerOpt = some ( RemotePeerInfo . init ( $ node . switch . peerInfo . peerId ) )
2022-10-28 16:30:01 +02:00
2024-03-16 00:08:47 +01:00
let publishRes =
await node . lightpushPublish ( pubsubTopic , message , peer = peerOpt . get ( ) )
2024-01-18 13:49:13 +01:00
if publishRes . isErr ( ) :
2024-03-16 00:08:47 +01:00
error " failed to publish message " , error = publishRes . error
2022-10-27 17:29:09 +02:00
2024-03-16 00:08:47 +01:00
return publishRes
2023-04-05 09:46:13 +02:00
2022-12-13 14:56:24 +05:30
## Waku RLN Relay
2024-03-16 00:08:47 +01:00
proc mountRlnRelay * (
node : WakuNode ,
rlnConf : WakuRlnConfig ,
spamHandler = none ( SpamHandler ) ,
registrationHandler = none ( RegistrationHandler ) ,
) {. async . } =
2023-09-11 12:02:31 +05:30
info " mounting rln relay "
if node . wakuRelay . isNil ( ) :
2024-03-16 00:08:47 +01:00
raise newException (
CatchableError , " WakuRelay protocol is not mounted, cannot mount WakuRlnRelay "
)
2023-09-11 12:02:31 +05:30
2024-03-16 00:08:47 +01:00
let rlnRelayRes = waitFor WakuRlnRelay . new ( rlnConf , registrationHandler )
2023-09-11 12:02:31 +05:30
if rlnRelayRes . isErr ( ) :
2024-03-16 00:08:47 +01:00
raise
newException ( CatchableError , " failed to mount WakuRlnRelay: " & rlnRelayRes . error )
2023-09-11 12:02:31 +05:30
let rlnRelay = rlnRelayRes . get ( )
2024-07-22 22:28:45 +05:30
if ( rlnConf . rlnRelayUserMessageLimit > rlnRelay . groupManager . rlnRelayMaxMessageLimit ) :
2024-07-26 16:57:34 +05:30
error " rln-relay-user-message-limit can ' t exceed the MAX_MESSAGE_LIMIT in the rln contract "
2023-09-11 12:02:31 +05:30
let validator = generateRlnValidator ( rlnRelay , spamHandler )
2024-01-29 16:11:26 +01:00
# register rln validator as default validator
debug " Registering RLN validator "
2024-02-01 18:16:10 +01:00
node . wakuRelay . addValidator ( validator , " RLN validation failed " )
2024-03-16 00:08:47 +01:00
2023-09-11 12:02:31 +05:30
node . wakuRlnRelay = rlnRelay
2023-01-26 15:48:30 +05:30
2022-10-27 17:29:09 +02:00
## Waku peer-exchange
2024-07-29 15:53:43 -04:00
proc mountPeerExchange * (
2024-09-18 15:58:07 +02:00
node : WakuNode ,
cluster : Option [ uint16 ] = none ( uint16 ) ,
rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit ,
2024-07-29 15:53:43 -04:00
) {. async : ( raises : [ ] ) . } =
2022-10-18 09:05:53 -05:00
info " mounting waku peer exchange "
2024-09-18 15:58:07 +02:00
node . wakuPeerExchange =
WakuPeerExchange . new ( node . peerManager , cluster , some ( rateLimit ) )
2022-10-18 09:05:53 -05:00
if node . started :
2024-05-16 22:29:11 +02:00
try :
await node . wakuPeerExchange . start ( )
except CatchableError :
error " failed to start wakuPeerExchange " , error = getCurrentExceptionMsg ( )
2022-10-18 09:05:53 -05:00
2024-05-16 22:29:11 +02:00
try :
node . switch . mount ( node . wakuPeerExchange , protocolMatcher ( WakuPeerExchangeCodec ) )
except LPError :
error " failed to mount wakuPeerExchange " , error = getCurrentExceptionMsg ( )
2022-10-18 09:05:53 -05:00
2024-03-14 17:48:09 +01:00
proc fetchPeerExchangePeers * (
2024-10-30 12:51:04 +02:00
node : Wakunode , amount = DefaultPXNumPeersReq
2024-09-18 15:58:07 +02:00
) : Future [ Result [ int , PeerExchangeResponseStatus ] ] {. async : ( raises : [ ] ) . } =
2023-02-09 16:59:29 +01:00
if node . wakuPeerExchange . isNil ( ) :
error " could not get peers from px, waku peer-exchange is nil "
2024-09-18 15:58:07 +02:00
return err (
(
status_code : PeerExchangeResponseStatusCode . SERVICE_UNAVAILABLE ,
status_desc : some ( " PeerExchange is not mounted " ) ,
)
)
2023-02-09 16:59:29 +01:00
info " Retrieving peer info via peer exchange protocol "
let pxPeersRes = await node . wakuPeerExchange . request ( amount )
if pxPeersRes . isOk :
var validPeers = 0
2023-04-19 16:12:00 +02:00
let peers = pxPeersRes . get ( ) . peerInfos
for pi in peers :
2023-02-09 16:59:29 +01:00
var record : enr . Record
if enr . fromBytes ( record , pi . enr ) :
2024-03-14 17:48:09 +01:00
node . peerManager . addPeer ( record . toRemotePeerInfo ( ) . get , PeerExchange )
2023-02-09 16:59:29 +01:00
validPeers + = 1
2024-03-14 17:48:09 +01:00
info " Retrieved peer info via peer exchange protocol " ,
validPeers = validPeers , totalPeers = peers . len
return ok ( validPeers )
2023-02-09 16:59:29 +01:00
else :
2024-03-14 17:48:09 +01:00
warn " failed to retrieve peer info via peer exchange protocol " ,
error = pxPeersRes . error
2024-09-18 15:58:07 +02:00
return err ( pxPeersRes . error )
2023-02-09 16:59:29 +01:00
2024-10-30 12:51:04 +02:00
proc peerExchangeLoop ( node : WakuNode ) {. async . } =
while true :
await sleepAsync ( 1 . minutes )
if not node . started :
continue
( await node . fetchPeerExchangePeers ( ) ) . isOkOr :
warn " Cannot fetch peers from peer exchange " , cause = error
proc startPeerExchangeLoop * ( node : WakuNode ) =
if node . wakuPeerExchange . isNil ( ) :
error " startPeerExchangeLoop: Peer Exchange is not mounted "
return
node . wakuPeerExchange . pxLoopHandle = node . peerExchangeLoop ( )
2022-11-02 09:45:21 +01:00
# TODO: Move to application module (e.g., wakunode2.nim)
2024-03-16 00:08:47 +01:00
proc setPeerExchangePeer * (
node : WakuNode , peer : RemotePeerInfo | MultiAddress | string
) =
2022-10-27 17:29:09 +02:00
if node . wakuPeerExchange . isNil ( ) :
error " could not set peer, waku peer-exchange is nil "
return
2024-03-16 00:08:47 +01:00
info " Set peer-exchange peer " , peer = peer
2022-10-27 17:29:09 +02:00
2023-04-12 11:29:11 +02:00
let remotePeerRes = parsePeerInfo ( peer )
if remotePeerRes . isErr ( ) :
error " could not parse peer info " , error = remotePeerRes . error
return
2024-03-14 17:48:09 +01:00
node . peerManager . addPeer ( remotePeerRes . value , PeerExchange )
2022-11-02 09:45:21 +01:00
waku_px_peers . inc ( )
2022-10-27 17:29:09 +02:00
## Other protocols
2024-05-16 22:29:11 +02:00
proc mountLibp2pPing * ( node : WakuNode ) {. async : ( raises : [ ] ) . } =
2022-10-18 09:05:53 -05:00
info " mounting libp2p ping protocol "
try :
node . libp2pPing = Ping . new ( rng = node . rng )
except Exception as e :
2024-05-16 22:29:11 +02:00
error " failed to create ping " , error = getCurrentExceptionMsg ( )
2022-11-21 09:36:41 +01:00
2022-10-18 09:05:53 -05:00
if node . started :
# Node has started already. Let's start ping too.
2024-05-16 22:29:11 +02:00
try :
await node . libp2pPing . start ( )
except CatchableError :
error " failed to start libp2pPing " , error = getCurrentExceptionMsg ( )
2022-11-21 09:36:41 +01:00
2024-05-16 22:29:11 +02:00
try :
node . switch . mount ( node . libp2pPing )
except LPError :
error " failed to mount libp2pPing " , error = getCurrentExceptionMsg ( )
2022-10-18 09:05:53 -05:00
2022-11-24 14:11:23 +01:00
# TODO: Move this logic to PeerManager
2022-10-18 09:05:53 -05:00
proc keepaliveLoop ( node : WakuNode , keepalive : chronos . Duration ) {. async . } =
2024-10-30 12:51:04 +02:00
while true :
await sleepAsync ( keepalive )
if not node . started :
continue
2024-10-03 12:37:22 +03:00
# Keep connected peers alive while running
# Each node is responsible of keeping its outgoing connections alive
2022-10-18 09:05:53 -05:00
trace " Running keepalive "
# First get a list of connected peer infos
2024-10-03 12:37:22 +03:00
let outPeers = node . peerManager . connectedPeers ( ) [ 1 ]
2022-10-18 09:05:53 -05:00
2024-10-03 12:37:22 +03:00
for peerId in outPeers :
2023-01-23 21:24:46 +01:00
try :
2024-10-03 12:37:22 +03:00
let conn = ( await node . peerManager . dialPeer ( peerId , PingCodec ) ) . valueOr :
warn " Failed dialing peer for keep alive " , peerId = peerId
continue
2023-01-23 21:24:46 +01:00
let pingDelay = await node . libp2pPing . ping ( conn )
2024-05-13 12:07:57 +02:00
await conn . close ( )
2023-01-23 21:24:46 +01:00
except CatchableError as exc :
2022-10-18 09:05:53 -05:00
waku_node_errors . inc ( labelValues = [ " keep_alive_failure " ] )
2022-11-21 09:36:41 +01:00
2024-10-30 12:51:04 +02:00
# 2 minutes default - 20% of the default chronosstream timeout duration
proc startKeepalive * ( node : WakuNode , keepalive = 2 . minutes ) =
info " starting keepalive " , keepalive = keepalive
2022-10-18 09:05:53 -05:00
2024-10-30 12:51:04 +02:00
asyncSpawn node . keepaliveLoop ( keepalive )
2022-10-18 09:05:53 -05:00
2024-05-16 22:29:11 +02:00
proc mountRendezvous * ( node : WakuNode ) {. async : ( raises : [ ] ) . } =
2023-06-01 21:43:10 +02:00
info " mounting rendezvous discovery protocol "
2024-11-10 09:27:04 +01:00
try :
node . rendezvous = RendezVous . new ( node . switch )
except Exception as e :
error " failed to create rendezvous " , error = getCurrentExceptionMsg ( )
return
2023-06-01 21:43:10 +02:00
if node . started :
2024-05-16 22:29:11 +02:00
try :
await node . rendezvous . start ( )
except CatchableError :
error " failed to start rendezvous " , error = getCurrentExceptionMsg ( )
2023-06-01 21:43:10 +02:00
2024-05-16 22:29:11 +02:00
try :
node . switch . mount ( node . rendezvous )
except LPError :
error " failed to mount rendezvous " , error = getCurrentExceptionMsg ( )
2023-06-01 21:43:10 +02:00
2023-10-27 10:11:47 +03:00
proc isBindIpWithZeroPort ( inputMultiAdd : MultiAddress ) : bool =
let inputStr = $ inputMultiAdd
if inputStr . contains ( " 0.0.0.0/tcp/0 " ) or inputStr . contains ( " 127.0.0.1/tcp/0 " ) :
return true
2022-10-27 17:29:09 +02:00
2023-10-27 10:11:47 +03:00
return false
2022-11-21 09:36:41 +01:00
2024-10-28 09:17:46 +01:00
proc updateAnnouncedAddrWithPrimaryIpAddr * ( node : WakuNode ) : Result [ void , string ] =
2022-10-18 09:05:53 -05:00
let peerInfo = node . switch . peerInfo
2023-11-16 18:15:27 +02:00
var announcedStr = " "
2022-10-18 09:05:53 -05:00
var listenStr = " "
2024-10-28 09:17:46 +01:00
var localIp = " 0.0.0.0 "
2023-11-16 18:15:27 +02:00
try :
localIp = $ getPrimaryIPAddr ( )
except Exception as e :
2024-03-16 00:08:47 +01:00
warn " Could not retrieve localIp " , msg = e . msg
2023-10-27 10:11:47 +03:00
info " PeerInfo " , peerId = peerInfo . peerId , addrs = peerInfo . addrs
2024-10-28 09:17:46 +01:00
## Update the WakuNode addresses
var newAnnouncedAddresses = newSeq [ MultiAddress ] ( 0 )
2022-10-18 09:05:53 -05:00
for address in node . announcedAddresses :
2024-10-28 09:17:46 +01:00
## Replace "0.0.0.0" or "127.0.0.1" with the localIp
let newAddr = ( $ address ) . replace ( " 0.0.0.0 " , localIp ) . replace ( " 127.0.0.1 " , localIp )
let fulladdr = " [ " & $ newAddr & " /p2p/ " & $ peerInfo . peerId & " ] "
2023-11-16 18:15:27 +02:00
announcedStr & = fulladdr
2024-10-28 09:17:46 +01:00
let newMultiAddr = MultiAddress . init ( newAddr ) . valueOr :
return err ( " error in updateAnnouncedAddrWithPrimaryIpAddr: " & $ error )
newAnnouncedAddresses . add ( newMultiAddr )
node . announcedAddresses = newAnnouncedAddresses
## Update the Switch addresses
node . switch . peerInfo . addrs = newAnnouncedAddresses
2023-11-16 18:15:27 +02:00
for transport in node . switch . transports :
for address in transport . addrs :
2024-10-28 09:17:46 +01:00
let fulladdr = " [ " & $ address & " /p2p/ " & $ peerInfo . peerId & " ] "
2023-11-16 18:15:27 +02:00
listenStr & = fulladdr
2022-11-21 09:36:41 +01:00
2024-10-28 09:17:46 +01:00
info " Listening on " ,
full = listenStr , localIp = localIp , switchAddress = $ ( node . switch . peerInfo . addrs )
2023-11-16 18:15:27 +02:00
info " Announcing addresses " , full = announcedStr
2022-10-18 09:05:53 -05:00
info " DNS: discoverable ENR " , enr = node . enr . toUri ( )
2024-10-28 09:17:46 +01:00
return ok ( )
2023-10-27 10:11:47 +03:00
proc start * ( node : WakuNode ) {. async . } =
## Starts a created Waku Node and
## all its mounted protocols.
2024-03-16 00:08:47 +01:00
waku_version . set ( 1 , labelValues = [ git_version ] )
info " Starting Waku node " , version = git_version
2023-10-27 10:11:47 +03:00
var zeroPortPresent = false
for address in node . announcedAddresses :
if isBindIpWithZeroPort ( address ) :
zeroPortPresent = true
2022-10-18 09:05:53 -05:00
# Perform relay-specific startup tasks TODO: this should be rethought
2022-10-27 17:29:09 +02:00
if not node . wakuRelay . isNil ( ) :
2022-10-18 09:05:53 -05:00
await node . startRelay ( )
2022-11-21 09:36:41 +01:00
2023-11-21 15:15:39 -05:00
if not node . wakuMetadata . isNil ( ) :
node . wakuMetadata . start ( )
2024-07-30 07:23:39 -04:00
if not node . wakuStoreResume . isNil ( ) :
await node . wakuStoreResume . start ( )
2024-08-13 07:27:34 -04:00
if not node . wakuSync . isNil ( ) :
node . wakuSync . start ( )
2022-10-28 12:51:46 +03:00
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
2024-03-16 00:08:47 +01:00
let addressMapper = proc (
listenAddrs : seq [ MultiAddress ]
) : Future [ seq [ MultiAddress ] ] {. async . } =
return node . announcedAddresses
2022-10-28 12:51:46 +03:00
node . switch . peerInfo . addressMappers . add ( addressMapper )
## The switch will update addresses after start using the addressMapper
await node . switch . start ( )
2022-10-18 09:05:53 -05:00
node . started = true
2022-11-21 09:36:41 +01:00
2023-11-16 18:15:27 +02:00
if not zeroPortPresent :
2024-10-28 09:17:46 +01:00
updateAnnouncedAddrWithPrimaryIpAddr ( node ) . isOkOr :
error " failed update announced addr " , error = $ error
2023-11-16 18:15:27 +02:00
else :
info " Listening port is dynamically allocated, address and ENR generation postponed "
2022-10-18 09:05:53 -05:00
info " Node started successfully "
proc stop * ( node : WakuNode ) {. async . } =
2024-04-30 12:52:11 +02:00
## By stopping the switch we are stopping all the underlying mounted protocols
await node . switch . stop ( )
2023-01-26 10:20:20 +01:00
node . peerManager . stop ( )
2022-10-18 09:05:53 -05:00
2023-09-11 12:02:31 +05:30
if not node . wakuRlnRelay . isNil ( ) :
2023-12-14 07:16:39 +01:00
try :
await node . wakuRlnRelay . stop ( ) ## this can raise an exception
except Exception :
2024-03-16 00:08:47 +01:00
error " exception stopping the node " , error = getCurrentExceptionMsg ( )
2023-07-27 17:21:21 +05:30
2023-12-11 08:50:40 +01:00
if not node . wakuArchive . isNil ( ) :
2024-03-12 07:51:03 -04:00
await node . wakuArchive . stopWait ( )
2023-12-11 08:50:40 +01:00
2024-07-30 07:23:39 -04:00
if not node . wakuStoreResume . isNil ( ) :
await node . wakuStoreResume . stopWait ( )
2024-10-30 12:51:04 +02:00
if not node . wakuPeerExchange . isNil ( ) and not node . wakuPeerExchange . pxLoopHandle . isNil ( ) :
await node . wakuPeerExchange . pxLoopHandle . cancelAndWait ( )
2022-11-21 09:36:41 +01:00
node . started = false
2023-09-06 14:16:19 +05:30
2023-12-14 07:16:39 +01:00
proc isReady * ( node : WakuNode ) : Future [ bool ] {. async : ( raises : [ Exception ] ) . } =
2023-09-11 12:02:31 +05:30
if node . wakuRlnRelay = = nil :
return true
return await node . wakuRlnRelay . isReady ( )
2023-09-06 14:16:19 +05:30
## TODO: add other protocol `isReady` checks
2024-09-18 15:58:07 +02:00
proc setRateLimits * ( node : WakuNode , limits : seq [ string ] ) : Result [ void , string ] =
let rateLimitConfig = ProtocolRateLimitSettings . parse ( limits )
if rateLimitConfig . isErr ( ) :
return err ( " invalid rate limit settings: " & rateLimitConfig . error )
node . rateLimitSettings = rateLimitConfig . get ( )
return ok ( )