2021-07-14 17:58:46 +00:00
{. push raises : [ Defect ] . }
2020-04-29 04:49:27 +00:00
import
2021-06-16 20:23:55 +00:00
std / [ options , tables , strutils , sequtils , os ] ,
2021-06-09 14:37:08 +00:00
chronos , chronicles , metrics ,
stew / shims / net as stewNet ,
2021-09-17 17:31:25 +00:00
stew / byteutils ,
2020-09-01 02:09:54 +00:00
eth / keys ,
2021-08-12 08:51:38 +00:00
eth / p2p / discoveryv5 / enr ,
2020-05-15 04:11:14 +00:00
libp2p / crypto / crypto ,
2021-06-15 08:55:47 +00:00
libp2p / protocols / ping ,
2021-04-21 09:36:56 +00:00
libp2p / protocols / pubsub / gossipsub ,
2021-08-25 11:57:35 +00:00
libp2p / nameresolving / dnsresolver ,
2021-06-09 14:37:08 +00:00
libp2p / builders ,
2021-07-13 07:18:51 +00:00
.. / protocol / [ waku_relay , waku_message ] ,
2020-11-24 04:34:32 +00:00
.. / protocol / waku_store / waku_store ,
2020-11-23 02:27:45 +00:00
.. / protocol / waku_swap / waku_swap ,
2021-01-05 04:52:10 +00:00
.. / protocol / waku_filter / waku_filter ,
2021-04-24 04:56:37 +00:00
.. / protocol / waku_lightpush / waku_lightpush ,
2021-09-17 17:31:25 +00:00
.. / protocol / waku_rln_relay / [ waku_rln_relay_types ] ,
2021-01-25 11:03:52 +00:00
.. / utils / peers ,
2021-07-14 17:58:46 +00:00
.. / utils / requests ,
2021-06-25 21:06:56 +00:00
. / storage / migration / migration_types ,
2021-08-12 08:51:38 +00:00
. / peer_manager / peer_manager ,
. / dnsdisc / waku_dnsdisc
2020-10-21 09:54:29 +00:00
2021-07-16 15:13:36 +00:00
export
builders ,
waku_relay , waku_message ,
waku_store ,
waku_swap ,
waku_filter ,
waku_lightpush ,
waku_rln_relay_types
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
2021-07-21 09:37:10 +00:00
import
libp2p / protocols / pubsub / rpc / messages ,
web3 ,
2021-09-17 17:31:25 +00:00
.. / protocol / waku_rln_relay / [ rln , waku_rln_relay_utils , waku_rln_relay_utils ]
2021-06-08 18:56:32 +00:00
2021-01-29 08:42:41 +00:00
declarePublicCounter waku_node_messages , " number of messages received " , [ " type " ]
declarePublicGauge waku_node_filters , " number of content filter subscriptions "
declarePublicGauge waku_node_errors , " number of wakunode errors " , [ " type " ]
2020-09-16 04:23:10 +00:00
logScope :
topics = " wakunode "
2020-09-11 04:16:45 +00:00
# Default clientId
const clientId * = " Nimbus Waku v2 node "
2020-04-29 04:49:27 +00:00
2021-03-11 07:48:59 +00:00
# Default topic
const defaultTopic = " /waku/2/default-waku/proto "
2020-04-29 04:49:27 +00:00
# key and crypto modules different
type
KeyPair * = crypto . KeyPair
PublicKey * = crypto . PublicKey
PrivateKey * = crypto . PrivateKey
2021-01-06 09:35:05 +00:00
# XXX: Weird type, should probably be using pubsub Topic object name?
Topic * = string
2020-07-29 13:24:01 +00:00
Message * = seq [ byte ]
2021-01-06 09:35:05 +00:00
WakuInfo * = object
# NOTE One for simplicity, can extend later as needed
listenStr * : string
#multiaddrStrings*: seq[string]
2020-11-24 04:34:32 +00:00
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode * = ref object of RootObj
2021-02-04 10:32:58 +00:00
peerManager * : PeerManager
2020-11-24 04:34:32 +00:00
switch * : Switch
wakuRelay * : WakuRelay
wakuStore * : WakuStore
wakuFilter * : WakuFilter
wakuSwap * : WakuSwap
2021-02-22 17:40:02 +00:00
wakuRlnRelay * : WakuRLNRelay
2021-04-24 04:56:37 +00:00
wakuLightPush * : WakuLightPush
2020-11-24 04:34:32 +00:00
peerInfo * : PeerInfo
2021-08-12 08:51:38 +00:00
enr * : enr . Record
2021-06-15 08:55:47 +00:00
libp2pPing * : Ping
2020-11-24 04:34:32 +00:00
libp2pTransportLoops * : seq [ Future [ void ] ]
filters * : Filters
rng * : ref BrHmacDrbgContext
2021-04-01 09:37:14 +00:00
started * : bool # Indicates that node has started listening
2020-11-24 04:34:32 +00:00
2021-07-27 06:48:56 +00:00
proc protocolMatcher ( codec : string ) : Matcher =
2021-07-02 08:49:41 +00:00
## Returns a protocol matcher function for the provided codec
proc match ( proto : string ) : bool {. gcsafe . } =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto . startsWith ( codec )
2021-07-27 06:48:56 +00:00
return match
2020-11-10 07:13:16 +00:00
proc removeContentFilters ( filters : var Filters , contentFilters : seq [ ContentFilter ] ) {. gcsafe . } =
# Flatten all unsubscribe topics into single seq
2021-05-05 08:34:40 +00:00
let unsubscribeTopics = contentFilters . mapIt ( it . contentTopic )
2020-11-10 07:13:16 +00:00
debug " unsubscribing " , unsubscribeTopics = unsubscribeTopics
var rIdToRemove : seq [ string ] = @ [ ]
for rId , f in filters . mpairs :
# Iterate filter entries to remove matching content topics
2021-05-05 08:34:40 +00:00
2020-11-10 07:13:16 +00:00
# make sure we delete the content filter
# if no more topics are left
2021-05-05 08:34:40 +00:00
f . contentFilters . keepIf ( proc ( cf : auto ) : bool = cf . contentTopic notin unsubscribeTopics )
2020-11-10 07:13:16 +00:00
if f . contentFilters . len = = 0 :
rIdToRemove . add ( rId )
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove :
filters . del ( rId )
debug " filters modified " , filters = filters
2020-07-24 01:39:58 +00:00
template tcpEndPoint ( address , port ) : auto =
MultiAddress . init ( address , tcpProtocol , port )
2020-09-01 02:09:54 +00:00
## Public API
##
2021-07-14 17:58:46 +00:00
proc new * ( T : type WakuNode , nodeKey : crypto . PrivateKey ,
2020-09-01 02:09:54 +00:00
bindIp : ValidIpAddress , bindPort : Port ,
2021-03-26 08:49:51 +00:00
extIp = none [ ValidIpAddress ] ( ) , extPort = none [ Port ] ( ) ,
2021-07-14 17:58:46 +00:00
peerStorage : PeerStorage = nil ) : T
{. raises : [ Defect , LPError ] . } =
2020-10-06 03:33:28 +00:00
## Creates a Waku Node.
##
## Status: Implemented.
##
2020-04-29 04:49:27 +00:00
let
2020-10-14 03:32:37 +00:00
rng = crypto . newRng ( )
2021-10-06 17:03:00 +00:00
hostAddress = tcpEndPoint ( bindIp , bindPort )
2020-09-01 02:09:54 +00:00
announcedAddresses = if extIp . isNone ( ) or extPort . isNone ( ) : @ [ ]
else : @ [ tcpEndPoint ( extIp . get ( ) , extPort . get ( ) ) ]
2020-05-21 04:16:58 +00:00
peerInfo = PeerInfo . init ( nodekey )
2021-08-12 08:51:38 +00:00
enrIp = if extIp . isSome ( ) : extIp
else : some ( bindIp )
enrTcpPort = if extPort . isSome ( ) : extPort
else : some ( bindPort )
enr = createEnr ( nodeKey , enrIp , enrTcpPort , none ( Port ) )
2020-09-01 02:09:54 +00:00
info " Initializing networking " , hostAddress ,
announcedAddresses
2020-07-24 01:39:58 +00:00
# XXX: Add this when we create node or start it?
2021-01-15 07:37:20 +00:00
peerInfo . addrs . add ( hostAddress ) # Index 0
for multiaddr in announcedAddresses :
peerInfo . addrs . add ( multiaddr ) # Announced addresses in index > 0
2021-03-26 08:49:51 +00:00
2020-10-14 03:32:37 +00:00
var switch = newStandardSwitch ( some ( nodekey ) , hostAddress ,
transportFlags = { ServerFlags . ReuseAddr } , rng = rng )
2020-09-16 04:23:10 +00:00
# TODO Untested - verify behavior after switch interface change
# More like this:
# let pubsub = GossipSub.init(
# switch = switch,
# msgIdProvider = msgIdProvider,
# triggerSelf = true, sign = false,
# verifySignature = false).PubSub
2021-07-14 17:58:46 +00:00
let wakuNode = WakuNode (
2021-03-26 08:49:51 +00:00
peerManager : PeerManager . new ( switch , peerStorage ) ,
2020-09-28 21:44:14 +00:00
switch : switch ,
2020-10-14 03:32:37 +00:00
rng : rng ,
2020-09-17 20:10:41 +00:00
peerInfo : peerInfo ,
2021-08-12 08:51:38 +00:00
enr : enr ,
2020-10-02 12:48:56 +00:00
filters : initTable [ string , Filter ] ( )
2020-09-17 20:10:41 +00:00
)
2020-09-11 11:28:27 +00:00
2021-07-14 17:58:46 +00:00
return wakuNode
2021-04-13 05:56:49 +00:00
proc subscribe ( node : WakuNode , topic : Topic , handler : Option [ TopicHandler ] ) =
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2021-04-13 05:56:49 +00:00
info " subscribe " , topic = topic
proc defaultHandler ( topic : string , data : seq [ byte ] ) {. async , gcsafe . } =
# A default handler should be registered for all topics
trace " Hit default handler " , topic = topic , data = data
let msg = WakuMessage . init ( data )
if msg . isOk ( ) :
2021-07-13 07:18:51 +00:00
# Notify mounted protocols of new message
if ( not node . wakuFilter . isNil ) :
await node . wakuFilter . handleMessage ( topic , msg . value ( ) )
if ( not node . wakuStore . isNil ) :
await node . wakuStore . handleMessage ( topic , msg . value ( ) )
2021-04-13 05:56:49 +00:00
waku_node_messages . inc ( labelValues = [ " relay " ] )
let wakuRelay = node . wakuRelay
if topic notin PubSub ( wakuRelay ) . topics :
# Add default handler only for new topics
debug " Registering default handler " , topic = topic
wakuRelay . subscribe ( topic , defaultHandler )
if handler . isSome :
debug " Registering handler " , topic = topic
wakuRelay . subscribe ( topic , handler . get ( ) )
2021-02-02 11:33:59 +00:00
proc subscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-07-27 09:01:06 +00:00
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
2020-08-27 10:15:46 +00:00
## this topic. TopicHandler is a method that takes a topic and some data.
2020-07-27 09:01:06 +00:00
##
2020-08-27 10:15:46 +00:00
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
2021-04-13 05:56:49 +00:00
node . subscribe ( topic , some ( handler ) )
2020-07-27 09:01:06 +00:00
2020-10-02 12:48:56 +00:00
proc subscribe * ( node : WakuNode , request : FilterRequest , handler : ContentFilterHandler ) {. async , gcsafe . } =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
2020-11-10 07:13:16 +00:00
# Sanity check for well-formed subscribe FilterRequest
doAssert ( request . subscribe , " invalid subscribe request " )
2020-10-02 12:48:56 +00:00
info " subscribe content " , filter = request
2020-07-27 09:01:06 +00:00
2020-10-20 02:36:27 +00:00
var id = generateRequestId ( node . rng )
2021-02-08 09:17:20 +00:00
2020-10-20 02:36:27 +00:00
if node . wakuFilter . isNil = = false :
2021-02-08 09:17:20 +00:00
let idOpt = await node . wakuFilter . subscribe ( request )
2020-07-27 09:01:06 +00:00
2021-02-08 09:17:20 +00:00
if idOpt . isSome ( ) :
# Subscribed successfully.
id = idOpt . get ( )
else :
# Failed to subscribe
error " remote subscription to filter failed " , filter = request
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
# Register handler for filter, whether remote subscription succeeded or not
node . filters [ id ] = Filter ( contentFilters : request . contentFilters , handler : handler )
2021-01-29 08:42:41 +00:00
waku_node_filters . set ( node . filters . len . int64 )
2021-02-02 11:33:59 +00:00
proc unsubscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-10-27 01:13:56 +00:00
## Unsubscribes a handler from a PubSub topic.
2020-07-27 09:01:06 +00:00
##
2020-10-27 01:13:56 +00:00
## Status: Implemented.
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 01:13:56 +00:00
info " unsubscribe " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 11:33:59 +00:00
wakuRelay . unsubscribe ( @ [ ( topic , handler ) ] )
2020-10-27 01:13:56 +00:00
2021-02-02 11:33:59 +00:00
proc unsubscribeAll * ( node : WakuNode , topic : Topic ) =
2020-10-27 01:13:56 +00:00
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribeAll`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 01:13:56 +00:00
info " unsubscribeAll " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 11:33:59 +00:00
wakuRelay . unsubscribeAll ( topic )
2020-10-27 01:13:56 +00:00
2020-07-27 09:01:06 +00:00
2020-11-10 07:13:16 +00:00
proc unsubscribe * ( node : WakuNode , request : FilterRequest ) {. async , gcsafe . } =
2020-07-27 09:01:06 +00:00
## Unsubscribe from a content filter.
##
2020-11-10 07:13:16 +00:00
## Status: Implemented.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert ( request . subscribe = = false , " invalid unsubscribe request " )
info " unsubscribe content " , filter = request
await node . wakuFilter . unsubscribe ( request )
node . filters . removeContentFilters ( request . contentFilters )
2020-09-01 15:20:38 +00:00
2021-01-29 08:42:41 +00:00
waku_node_filters . set ( node . filters . len . int64 )
2020-11-16 09:55:49 +00:00
2021-03-16 18:18:40 +00:00
proc publish * ( node : WakuNode , topic : Topic , message : WakuMessage , rlnRelayEnabled : bool = false ) {. async , gcsafe . } =
2020-09-01 15:20:38 +00:00
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
2020-07-27 09:01:06 +00:00
##
2020-09-01 15:20:38 +00:00
## Status: Implemented.
2021-03-16 18:18:40 +00:00
## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature)
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead. "
# @TODO improved error handling
return
2020-07-27 09:01:06 +00:00
2020-09-16 04:23:10 +00:00
let wakuRelay = node . wakuRelay
2020-09-02 03:15:25 +00:00
debug " publish " , topic = topic , contentTopic = message . contentTopic
2021-03-16 18:18:40 +00:00
var publishingMessage = message
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
if rlnRelayEnabled :
# if rln relay is enabled then a proof must be generated and added to the waku message
let
proof = proofGen ( message . payload )
## TODO here since the message is immutable we have to make a copy of it and then attach the proof to its duplicate
## TODO however, it might be better to change message type to mutable (i.e., var) so that we can add the proof field to the original message
publishingMessage = WakuMessage ( payload : message . payload , contentTopic : message . contentTopic , version : message . version , proof : proof )
2021-03-16 18:18:40 +00:00
2020-09-02 03:15:25 +00:00
let data = message . encode ( ) . buffer
2020-12-02 08:40:53 +00:00
discard await wakuRelay . publish ( topic , data )
2020-07-27 09:01:06 +00:00
2021-04-24 04:56:37 +00:00
proc lightpush * ( node : WakuNode , topic : Topic , message : WakuMessage , handler : PushResponseHandler ) {. async , gcsafe . } =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality. This field may be also be omitted.
##
## Status: Implemented.
debug " Publishing with lightpush " , topic = topic , contentTopic = message . contentTopic
let rpc = PushRequest ( pubSubTopic : topic , message : message )
await node . wakuLightPush . request ( rpc , handler )
2020-10-06 03:33:28 +00:00
proc query * ( node : WakuNode , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
2020-09-24 02:16:25 +00:00
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse.
2020-07-27 09:01:06 +00:00
##
2020-09-24 02:16:25 +00:00
## Status: Implemented.
2020-10-06 03:33:28 +00:00
2020-11-24 04:53:42 +00:00
# TODO Once waku swap is less experimental, this can simplified
2020-11-21 05:31:48 +00:00
if node . wakuSwap . isNil :
debug " Using default query "
await node . wakuStore . query ( query , handler )
else :
2020-11-16 09:55:49 +00:00
debug " Using SWAPAccounting query "
2020-11-24 04:53:42 +00:00
# TODO wakuSwap now part of wakuStore object
await node . wakuStore . queryWithAccounting ( query , handler )
2020-11-16 09:55:49 +00:00
2021-10-06 12:29:08 +00:00
proc resume * ( node : WakuNode , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) ) {. async , gcsafe . } =
2021-05-26 19:33:22 +00:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if not node . wakuStore . isNil :
let retrievedMessages = await node . wakuStore . resume ( peerList )
if retrievedMessages . isOk :
info " the number of retrieved messages since the last online time: " , number = retrievedMessages . value
2020-10-06 03:33:28 +00:00
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info * ( node : WakuNode ) : WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
##
## Status: Implemented.
##
# TODO Generalize this for other type of multiaddresses
let peerInfo = node . peerInfo
2021-01-15 07:37:20 +00:00
let listenStr = $ peerInfo . addrs [ ^ 1 ] & " /p2p/ " & $ peerInfo . peerId
2020-10-06 03:33:28 +00:00
let wakuInfo = WakuInfo ( listenStr : listenStr )
return wakuInfo
2020-07-27 09:01:06 +00:00
2021-07-14 17:58:46 +00:00
proc mountFilter * ( node : WakuNode ) {. raises : [ Defect , KeyError , LPError ] } =
2020-10-20 02:36:27 +00:00
info " mounting filter "
2021-07-14 17:58:46 +00:00
proc filterHandler ( requestId : string , msg : MessagePush )
{. gcsafe , raises : [ Defect , KeyError ] . } =
2020-10-20 02:36:27 +00:00
info " push received "
for message in msg . messages :
2021-04-22 13:45:13 +00:00
node . filters . notify ( message , requestId ) # Trigger filter handlers on a light node
2021-01-29 08:42:41 +00:00
waku_node_messages . inc ( labelValues = [ " filter " ] )
2020-10-20 02:36:27 +00:00
2021-02-08 09:17:20 +00:00
node . wakuFilter = WakuFilter . init ( node . peerManager , node . rng , filterHandler )
2021-07-02 08:49:41 +00:00
node . switch . mount ( node . wakuFilter , protocolMatcher ( WakuFilterCodec ) )
2020-10-20 02:36:27 +00:00
2020-11-24 04:53:42 +00:00
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
2021-07-14 17:58:46 +00:00
proc mountSwap * ( node : WakuNode , swapConfig : SwapConfig = SwapConfig . init ( ) ) {. raises : [ Defect , LPError ] . } =
2021-06-15 02:06:36 +00:00
info " mounting swap " , mode = $ swapConfig . mode
node . wakuSwap = WakuSwap . init ( node . peerManager , node . rng , swapConfig )
2021-07-02 08:49:41 +00:00
node . switch . mount ( node . wakuSwap , protocolMatcher ( WakuSwapCodec ) )
2020-11-18 12:45:51 +00:00
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
2021-07-14 17:58:46 +00:00
proc mountStore * ( node : WakuNode , store : MessageStore = nil , persistMessages : bool = false ) {. raises : [ Defect , LPError ] . } =
2020-11-24 04:53:42 +00:00
info " mounting store "
if node . wakuSwap . isNil :
debug " mounting store without swap "
2021-07-13 07:18:51 +00:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store , persistMessages = persistMessages )
2020-11-24 04:53:42 +00:00
else :
debug " mounting store with swap "
2021-07-13 07:18:51 +00:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store , node . wakuSwap , persistMessages = persistMessages )
2020-11-24 04:53:42 +00:00
2021-07-02 08:49:41 +00:00
node . switch . mount ( node . wakuStore , protocolMatcher ( WakuStoreCodec ) )
2021-09-24 17:32:45 +00:00
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
2021-09-24 17:32:45 +00:00
proc addRLNRelayValidator * ( node : WakuNode , pubsubTopic : string ) =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets message validator on the given pubsubTopic, the validator will check that
## all the messages published in the pubsubTopic have a valid zero-knowledge proof
proc validator ( topic : string , message : messages . Message ) : Future [ ValidationResult ] {. async . } =
let msg = WakuMessage . init ( message . data )
if msg . isOk ( ) :
# check the proof
if proofVrfy ( msg . value ( ) . payload , msg . value ( ) . proof ) :
return ValidationResult . Accept
# set a validator for the pubsubTopic
let pb = PubSub ( node . wakuRelay )
pb . addValidator ( pubsubTopic , validator )
2021-07-14 17:58:46 +00:00
proc mountRlnRelay * ( node : WakuNode ,
2021-08-26 23:14:51 +00:00
ethClientAddrOpt : Option [ string ] = none ( string ) ,
ethAccAddrOpt : Option [ Address ] = none ( Address ) ,
memContractAddOpt : Option [ Address ] = none ( Address ) ,
groupOpt : Option [ seq [ IDCommitment ] ] = none ( seq [ IDCommitment ] ) ,
memKeyPairOpt : Option [ MembershipKeyPair ] = none ( MembershipKeyPair ) ,
2021-09-24 17:32:45 +00:00
memIndexOpt : Option [ MembershipIndex ] = none ( MembershipIndex ) ,
onchainMode : bool = true ,
pubsubTopic : string ) {. async . } =
2021-06-08 18:56:32 +00:00
# TODO return a bool value to indicate the success of the call
# check whether inputs are provided
2021-09-24 17:32:45 +00:00
# relay protocol is the prerequisite of rln-relay
if node . wakuRelay . isNil :
error " Failed to mount WakuRLNRelay. Relay protocol is not mounted. "
return
# check whether the pubsub topic is supported at the relay level
if pubsubTopic notin node . wakuRelay . defaultTopics :
error " Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic. " , pubsubTopic = pubsubTopic
return
2021-09-17 17:31:25 +00:00
if onchainMode :
if memContractAddOpt . isNone ( ) :
error " failed to mount rln relay: membership contract address is not provided "
return
if ethClientAddrOpt . isNone ( ) :
error " failed to mount rln relay: Ethereum client address is not provided "
return
if ethAccAddrOpt . isNone ( ) :
error " failed to mount rln relay: Ethereum account address is not provided "
return
else :
if groupOpt . isNone ( ) :
error " failed to mount rln relay: group information is not provided "
return
2021-08-26 23:14:51 +00:00
if memKeyPairOpt . isNone ( ) :
2021-09-17 17:31:25 +00:00
error " failed to mount rln relay: membership key of the node is not provided "
2021-08-26 23:14:51 +00:00
return
if memIndexOpt . isNone ( ) :
2021-09-17 17:31:25 +00:00
error " failed to mount rln relay: membership index is not provided "
2021-08-26 23:14:51 +00:00
return
2021-09-17 17:31:25 +00:00
var
ethClientAddr : string
ethAccAddr : Address
memContractAdd : Address
if onchainMode :
2021-08-26 23:14:51 +00:00
ethClientAddr = ethClientAddrOpt . get ( )
ethAccAddr = ethAccAddrOpt . get ( )
memContractAdd = memContractAddOpt . get ( )
2021-09-17 17:31:25 +00:00
let
2021-08-26 23:14:51 +00:00
group = groupOpt . get ( )
memKeyPair = memKeyPairOpt . get ( )
memIndex = memIndexOpt . get ( )
# check the peer's index and the inclusion of user's identity commitment in the group
doAssert ( ( memKeyPair . idCommitment ) = = group [ int ( memIndex ) ] )
2021-06-08 18:56:32 +00:00
# create an RLN instance
2021-09-17 17:31:25 +00:00
var rlnInstance = createRLNInstance ( )
2021-08-24 19:25:29 +00:00
doAssert ( rlnInstance . isOk )
2021-08-26 23:14:51 +00:00
var rln = rlnInstance . value
# generate the membership keys if none is provided
2021-09-17 17:31:25 +00:00
# in a happy path, this condition never gets through for a static group of users
2021-08-26 23:14:51 +00:00
# the node should pass its keys i.e., memKeyPairOpt to the function
if not memKeyPairOpt . isSome :
let membershipKeyPair = rln . membershipKeyGen ( )
# check whether keys are generated
doAssert ( membershipKeyPair . isSome ( ) )
debug " the membership key for the rln relay is generated " , idKey = membershipKeyPair . get ( ) . idKey . toHex , idCommitment = membershipKeyPair . get ( ) . idCommitment . toHex
# add members to the Merkle tree
for index in 0 .. group . len - 1 :
let member = group [ index ]
let member_is_added = rln . insertMember ( member )
doAssert ( member_is_added )
# create the WakuRLNRelay
var rlnPeer = WakuRLNRelay ( membershipKeyPair : memKeyPair ,
membershipIndex : memIndex ,
membershipContractAddress : memContractAdd ,
ethClientAddress : ethClientAddr ,
ethAccountAddress : ethAccAddr ,
rlnInstance : rln )
2021-06-08 18:56:32 +00:00
2021-09-17 17:31:25 +00:00
if onchainMode :
# register the rln-relay peer to the membership contract
let is_successful = await rlnPeer . register ( )
# check whether registration is done
doAssert ( is_successful )
debug " peer is successfully registered into the membership contract "
2021-06-08 18:56:32 +00:00
2021-09-24 17:32:45 +00:00
# adds a topic validator for the supplied pubsub topic at the relay protocol
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
# the topic validator checks for the correct non-spamming proof of the message
addRLNRelayValidator ( node , pubsubTopic )
debug " rln relay topic validator is mounted successfully " , pubsubTopic = pubsubTopic
2021-06-08 18:56:32 +00:00
node . wakuRlnRelay = rlnPeer
2021-07-21 09:37:10 +00:00
2021-07-22 09:46:54 +00:00
proc startRelay * ( node : WakuNode ) {. async . } =
if node . wakuRelay . isNil :
trace " Failed to start relay. Not mounted. "
return
## Setup and start relay protocol
info " starting relay "
# Topic subscriptions
for topic in node . wakuRelay . defaultTopics :
node . subscribe ( topic , none ( TopicHandler ) )
# Resume previous relay connections
2021-07-27 06:48:56 +00:00
if node . peerManager . hasPeers ( protocolMatcher ( WakuRelayCodec ) ) :
2021-07-22 09:46:54 +00:00
info " Found previous WakuRelay peers. Reconnecting. "
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node . wakuRelay . parameters . pruneBackoff + chronos . seconds ( BackoffSlackTime )
await node . peerManager . reconnectPeers ( WakuRelayCodec ,
2021-07-27 06:48:56 +00:00
protocolMatcher ( WakuRelayCodec ) ,
2021-07-22 09:46:54 +00:00
backoffPeriod )
# Start the WakuRelay protocol
await node . wakuRelay . start ( )
info " relay started successfully "
2021-05-04 12:11:41 +00:00
proc mountRelay * ( node : WakuNode ,
topics : seq [ string ] = newSeq [ string ] ( ) ,
2021-05-24 11:19:33 +00:00
relayMessages = true ,
2021-07-14 17:58:46 +00:00
triggerSelf = true )
# @TODO: Better error handling: CatchableError is raised by `waitFor`
{. gcsafe , raises : [ Defect , InitializationError , LPError , CatchableError ] . } =
2020-10-20 02:36:27 +00:00
let wakuRelay = WakuRelay . init (
switch = node . switch ,
# Use default
#msgIdProvider = msgIdProvider,
2021-05-24 11:19:33 +00:00
triggerSelf = triggerSelf ,
2020-10-20 02:36:27 +00:00
sign = false ,
verifySignature = false
)
2021-05-04 12:11:41 +00:00
2021-09-24 17:32:45 +00:00
info " mounting relay " , relayMessages = relayMessages
2020-10-20 02:36:27 +00:00
2021-07-22 09:46:54 +00:00
## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay . defaultTopics = concat ( @ [ defaultTopic ] , topics )
2021-07-02 08:49:41 +00:00
node . switch . mount ( wakuRelay , protocolMatcher ( WakuRelayCodec ) )
2020-10-20 02:36:27 +00:00
2021-07-22 09:46:54 +00:00
if relayMessages :
2021-05-04 12:11:41 +00:00
## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes).
## All nodes, however, currently require WakuRelay, regardless of desired capabilities.
## This is to allow protocol stream negotation with relay-capable nodes to succeed.
## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub
## topics. We also never start the relay protocol. node.wakuRelay remains nil.
2021-07-22 09:46:54 +00:00
## @TODO: in future, this WakuRelay dependency will be removed completely
node . wakuRelay = wakuRelay
2021-09-17 17:31:25 +00:00
2021-07-22 09:46:54 +00:00
info " relay mounted successfully "
2021-04-01 09:37:14 +00:00
2021-07-22 09:46:54 +00:00
if node . started :
# Node has started already. Let's start relay too.
waitFor node . startRelay ( )
2021-02-22 17:40:02 +00:00
2021-07-14 17:58:46 +00:00
proc mountLightPush * ( node : WakuNode ) {. raises : [ Defect , LPError ] . } =
2021-04-24 04:56:37 +00:00
info " mounting light push "
if node . wakuRelay . isNil :
debug " mounting lightpush without relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil )
else :
debug " mounting lightpush with relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil , node . wakuRelay )
2021-07-02 08:49:41 +00:00
node . switch . mount ( node . wakuLightPush , protocolMatcher ( WakuLightPushCodec ) )
2021-04-24 04:56:37 +00:00
2021-07-14 17:58:46 +00:00
proc mountLibp2pPing * ( node : WakuNode ) {. raises : [ Defect , LPError ] . } =
2021-06-15 08:55:47 +00:00
info " mounting libp2p ping protocol "
2021-06-02 07:53:34 +00:00
2021-07-14 17:58:46 +00:00
try :
node . libp2pPing = Ping . new ( rng = node . rng )
except Exception as e :
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module
2021-07-22 09:46:54 +00:00
raise newException ( LPError , " Failed to initialize ping protocol " )
2021-06-02 07:53:34 +00:00
2021-06-15 08:55:47 +00:00
node . switch . mount ( node . libp2pPing )
2021-06-02 07:53:34 +00:00
proc keepaliveLoop ( node : WakuNode , keepalive : chronos . Duration ) {. async . } =
while node . started :
2021-06-15 08:55:47 +00:00
# Keep all connected peers alive while running
2021-06-02 07:53:34 +00:00
trace " Running keepalive "
2021-06-15 08:55:47 +00:00
# First get a list of connected peer infos
let peers = node . peerManager . peers ( )
. filterIt ( node . peerManager . connectedness ( it . peerId ) = = Connected )
2021-10-06 12:29:08 +00:00
. mapIt ( it . toRemotePeerInfo ( ) )
2021-06-15 08:55:47 +00:00
# Attempt to retrieve and ping the active outgoing connection for each peer
for peer in peers :
let connOpt = await node . peerManager . dialPeer ( peer , PingCodec )
if connOpt . isNone :
# @TODO more sophisticated error handling here
debug " failed to connect to remote peer " , peer = peer
waku_node_errors . inc ( labelValues = [ " keep_alive_failure " ] )
return
discard await node . libp2pPing . ping ( connOpt . get ( ) ) # Ping connection
2021-06-02 07:53:34 +00:00
await sleepAsync ( keepalive )
proc startKeepalive * ( node : WakuNode ) =
let defaultKeepalive = 5 . minutes # 50% of the default chronosstream timeout duration
info " starting keepalive " , keepalive = defaultKeepalive
asyncSpawn node . keepaliveLoop ( defaultKeepalive )
2020-10-21 09:54:29 +00:00
## Helpers
proc dialPeer * ( n : WakuNode , address : string ) {. async . } =
info " dialPeer " , address = address
# XXX: This turns ipfs into p2p, not quite sure why
2021-10-06 12:29:08 +00:00
let remotePeer = parseRemotePeerInfo ( address )
2020-09-25 11:35:32 +00:00
2021-01-25 11:03:52 +00:00
info " Dialing peer " , wireAddr = remotePeer . addrs [ 0 ] , peerId = remotePeer . peerId
2020-10-21 09:54:29 +00:00
# NOTE This is dialing on WakuRelay protocol specifically
2021-02-04 10:32:58 +00:00
discard await n . peerManager . dialPeer ( remotePeer , WakuRelayCodec )
info " Post peerManager dial "
2020-09-11 04:16:45 +00:00
2021-07-14 17:58:46 +00:00
proc setStorePeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 10:02:47 +00:00
info " Set store peer " , address = address
2020-09-11 04:16:45 +00:00
2021-10-06 12:29:08 +00:00
let remotePeer = parseRemotePeerInfo ( address )
2020-09-25 11:35:32 +00:00
2020-10-21 09:54:29 +00:00
n . wakuStore . setPeer ( remotePeer )
2020-09-25 11:35:32 +00:00
2021-07-14 17:58:46 +00:00
proc setFilterPeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 10:02:47 +00:00
info " Set filter peer " , address = address
2020-09-25 11:35:32 +00:00
2021-10-06 12:29:08 +00:00
let remotePeer = parseRemotePeerInfo ( address )
2020-10-09 13:58:50 +00:00
2020-10-21 09:54:29 +00:00
n . wakuFilter . setPeer ( remotePeer )
2020-10-09 13:58:50 +00:00
2021-07-14 17:58:46 +00:00
proc setLightPushPeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 10:02:47 +00:00
info " Set lightpush peer " , address = address
2021-10-06 12:29:08 +00:00
let remotePeer = parseRemotePeerInfo ( address )
2021-06-04 10:02:47 +00:00
n . wakuLightPush . setPeer ( remotePeer )
2020-10-22 11:12:00 +00:00
proc connectToNodes * ( n : WakuNode , nodes : seq [ string ] ) {. async . } =
2020-10-21 09:54:29 +00:00
for nodeId in nodes :
info " connectToNodes " , node = nodeId
# XXX: This seems...brittle
2020-10-22 11:12:00 +00:00
await dialPeer ( n , nodeId )
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
2021-10-06 12:29:08 +00:00
proc connectToNodes * ( n : WakuNode , nodes : seq [ RemotePeerInfo ] ) {. async . } =
for remotePeerInfo in nodes :
info " connectToNodes " , peer = remotePeerInfo
discard await n . peerManager . dialPeer ( remotePeerInfo , WakuRelayCodec )
2020-10-22 11:12:00 +00:00
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
2020-10-09 13:58:50 +00:00
2021-07-22 09:46:54 +00:00
proc start * ( node : WakuNode ) {. async . } =
## Starts a created Waku Node and
## all its mounted protocols.
##
## Status: Implemented.
node . libp2pTransportLoops = await node . switch . start ( )
# TODO Get this from WakuNode obj
let peerInfo = node . peerInfo
info " PeerInfo " , peerId = peerInfo . peerId , addrs = peerInfo . addrs
let listenStr = $ peerInfo . addrs [ ^ 1 ] & " /p2p/ " & $ peerInfo . peerId
## XXX: this should be /ip4..., / stripped?
info " Listening on " , full = listenStr
2021-08-12 08:51:38 +00:00
info " Discoverable ENR " , enr = node . enr . toURI ( )
2021-07-22 09:46:54 +00:00
if not node . wakuRelay . isNil :
await node . startRelay ( )
info " Node started successfully "
node . started = true
proc stop * ( node : WakuNode ) {. async . } =
if not node . wakuRelay . isNil :
await node . wakuRelay . stop ( )
await node . switch . stop ( )
node . started = false
2021-07-14 17:58:46 +00:00
{. pop . } # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
2020-10-21 09:54:29 +00:00
when isMainModule :
2021-07-22 09:46:54 +00:00
## Node setup happens in 6 phases:
## 1. Set up storage
## 2. Initialize node
## 3. Mount and initialize configured protocols
## 4. Start node and mounted protocols
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
2020-10-21 09:54:29 +00:00
import
2021-07-22 09:46:54 +00:00
confutils ,
2021-04-15 08:18:14 +00:00
system / ansi_c ,
2021-07-22 09:46:54 +00:00
.. / .. / common / utils / nat ,
. / config ,
. / waku_setup ,
2021-03-25 08:37:11 +00:00
. / storage / message / waku_message_store ,
2021-07-22 09:46:54 +00:00
. / storage / peer / waku_peer_storage
logScope :
topics = " wakunode.setup "
###################
# Setup functions #
###################
# 1/6 Setup storage
proc setupStorage ( conf : WakuNodeConf ) :
SetupResult [ tuple [ pStorage : WakuPeerStorage , mStorage : WakuMessageStore ] ] =
## Setup a SQLite Database for a wakunode based on a supplied
## configuration file and perform all necessary migration.
##
## If config allows, return peer storage and message store
## for use elsewhere.
var
sqliteDatabase : SqliteDatabase
storeTuple : tuple [ pStorage : WakuPeerStorage , mStorage : WakuMessageStore ]
# Setup DB
if conf . dbPath ! = " " :
let dbRes = SqliteDatabase . init ( conf . dbPath )
if dbRes . isErr :
warn " failed to init database " , err = dbRes . error
waku_node_errors . inc ( labelValues = [ " init_db_failure " ] )
return err ( " failed to init database " )
else :
sqliteDatabase = dbRes . value
if not sqliteDatabase . isNil :
# Database initialized. Let's set it up
sqliteDatabase . runMigrations ( conf ) # First migrate what we have
if conf . persistPeers :
# Peer persistence enable. Set up Peer table in storage
let res = WakuPeerStorage . new ( sqliteDatabase )
if res . isErr :
warn " failed to init new WakuPeerStorage " , err = res . error
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
else :
storeTuple . pStorage = res . value
if conf . persistMessages :
# Historical message persistence enable. Set up Message table in storage
let res = WakuMessageStore . init ( sqliteDatabase )
if res . isErr :
warn " failed to init WakuMessageStore " , err = res . error
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
else :
storeTuple . mStorage = res . value
ok ( storeTuple )
# 2/6 Initialize node
proc initNode ( conf : WakuNodeConf ,
pStorage : WakuPeerStorage = nil ) : SetupResult [ WakuNode ] =
## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
2020-09-11 04:16:45 +00:00
2021-10-12 11:43:01 +00:00
## `udpPort` is only supplied to satisfy underlying APIs but is not
## actually a supported transport.
let udpPort = conf . tcpPort
2020-09-11 04:16:45 +00:00
let
2021-07-22 09:46:54 +00:00
( extIp , extTcpPort , extUdpPort ) = setupNat ( conf . nat ,
clientId ,
Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
2021-10-12 11:43:01 +00:00
Port ( uint16 ( udpPort ) + conf . portsShift ) )
2021-07-22 09:46:54 +00:00
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if extIp . isSome ( ) and extTcpPort . isNone ( ) :
some ( Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) )
else :
extTcpPort
node = WakuNode . new ( conf . nodekey ,
conf . listenAddress , Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
extIp , extPort ,
pStorage )
ok ( node )
# 3/6 Mount and initialize configured protocols
proc setupProtocols ( node : var WakuNode ,
conf : WakuNodeConf ,
mStorage : WakuMessageStore = nil ) : SetupResult [ bool ] =
2020-12-24 08:02:30 +00:00
2021-07-22 09:46:54 +00:00
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
2020-12-24 08:02:30 +00:00
2021-07-22 09:46:54 +00:00
# Mount relay on all nodes
mountRelay ( node ,
conf . topics . split ( " " ) ,
2021-09-17 17:31:25 +00:00
relayMessages = conf . relay , # Indicates if node is capable to relay messages
2021-09-24 17:32:45 +00:00
)
2020-12-24 08:02:30 +00:00
2021-07-22 09:46:54 +00:00
# Keepalive mounted on all nodes
mountLibp2pPing ( node )
2020-12-24 08:02:30 +00:00
2021-09-24 17:32:45 +00:00
when defined ( rln ) :
if conf . rlnRelay :
info " WakuRLNRelay is enabled "
# set up rln relay inputs
let ( groupOpt , memKeyPairOpt , memIndexOpt ) = rlnRelaySetUp ( conf . rlnRelayMemIndex )
if memIndexOpt . isNone :
error " failed to mount WakuRLNRelay "
else :
# mount rlnrelay in offline mode (for now)
waitFor node . mountRlnRelay ( groupOpt = groupOpt , memKeyPairOpt = memKeyPairOpt , memIndexOpt = memIndexOpt , onchainMode = false , pubsubTopic = conf . rlnRelayPubsubTopic )
info " membership id key " , idkey = memKeyPairOpt . get ( ) . idKey . toHex
info " membership id commitment key " , idCommitmentkey = memKeyPairOpt . get ( ) . idCommitment . toHex
# check the correct construction of the tree by comparing the calculated root against the expected root
# no error should happen as it is already captured in the unit tests
# TODO have added this check to account for unseen corner cases, will remove it later
let
root = node . wakuRlnRelay . rlnInstance . getMerkleRoot . value . toHex ( )
expectedRoot = STATIC_GROUP_MERKLE_ROOT
if root ! = expectedRoot :
error " root mismatch: something went wrong not in Merkle tree construction "
debug " the calculated root " , root
info " WakuRLNRelay is mounted successfully " , pubsubtopic = conf . rlnRelayPubsubTopic
2021-07-22 09:46:54 +00:00
if conf . swap :
mountSwap ( node )
# TODO Set swap peer, for now should be same as store peer
2020-09-11 04:16:45 +00:00
2021-07-22 09:46:54 +00:00
# Store setup
if ( conf . storenode ! = " " ) or ( conf . store ) :
mountStore ( node , mStorage , conf . persistMessages )
2021-03-26 08:49:51 +00:00
2021-07-22 09:46:54 +00:00
if conf . storenode ! = " " :
setStorePeer ( node , conf . storenode )
2021-03-26 08:49:51 +00:00
2021-07-22 09:46:54 +00:00
# NOTE Must be mounted after relay
if ( conf . lightpushnode ! = " " ) or ( conf . lightpush ) :
mountLightPush ( node )
if conf . lightpushnode ! = " " :
setLightPushPeer ( node , conf . lightpushnode )
# Filter setup. NOTE Must be mounted after relay
if ( conf . filternode ! = " " ) or ( conf . filter ) :
mountFilter ( node )
2021-06-25 21:06:56 +00:00
2021-07-22 09:46:54 +00:00
if conf . filternode ! = " " :
setFilterPeer ( node , conf . filternode )
ok ( true ) # Success
2021-03-26 08:49:51 +00:00
2021-07-22 09:46:54 +00:00
# 4/6 Start node and mounted protocols
proc startNode ( node : WakuNode , conf : WakuNodeConf ) : SetupResult [ bool ] =
## Start a configured node and all mounted protocols.
## Resume history, connect to static nodes and start
## keep-alive, if configured.
2021-03-26 08:49:51 +00:00
2021-07-22 09:46:54 +00:00
# Start Waku v2 node
waitFor node . start ( )
# Resume historical messages, this has to be called after the node has been started
if conf . store and conf . persistMessages :
waitFor node . resume ( )
# Connect to configured static nodes
if conf . staticnodes . len > 0 :
waitFor connectToNodes ( node , conf . staticnodes )
2021-08-12 08:51:38 +00:00
# Connect to discovered nodes
if conf . dnsDiscovery and conf . dnsDiscoveryUrl ! = " " :
2021-08-25 11:57:35 +00:00
debug " Discovering nodes using Waku DNS discovery " , url = conf . dnsDiscoveryUrl
var nameServers : seq [ TransportAddress ]
for ip in conf . dnsDiscoveryNameServers :
nameServers . add ( initTAddress ( ip , Port ( 53 ) ) ) # Assume all servers use port 53
let dnsResolver = DnsResolver . new ( nameServers )
proc resolver ( domain : string ) : Future [ string ] {. async , gcsafe . } =
trace " resolving " , domain = domain
let resolved = await dnsResolver . resolveTxt ( domain )
return resolved [ 0 ] # Use only first answer
2021-08-12 08:51:38 +00:00
2021-08-17 08:30:17 +00:00
var wakuDnsDiscovery = WakuDnsDiscovery . init ( conf . dnsDiscoveryUrl ,
2021-08-25 11:57:35 +00:00
resolver )
2021-08-12 08:51:38 +00:00
if wakuDnsDiscovery . isOk :
let discoveredPeers = wakuDnsDiscovery . get ( ) . findPeers ( )
if discoveredPeers . isOk :
info " Connecting to discovered peers "
waitFor connectToNodes ( node , discoveredPeers . get ( ) )
else :
warn " Failed to init Waku DNS discovery "
2020-11-16 08:38:52 +00:00
2021-07-22 09:46:54 +00:00
# Start keepalive, if enabled
if conf . keepAlive :
node . startKeepalive ( )
ok ( true ) # Success
2020-11-24 04:53:42 +00:00
2021-07-22 09:46:54 +00:00
# 5/6 Start monitoring tools and external interfaces
proc startExternal ( node : WakuNode , conf : WakuNodeConf ) : SetupResult [ bool ] =
## Start configured external interfaces and monitoring tools
## on a Waku v2 node, including the RPC API and metrics
## monitoring ports.
2021-05-13 21:21:46 +00:00
2021-07-22 09:46:54 +00:00
if conf . rpc :
startRpc ( node , conf . rpcAddress , Port ( conf . rpcPort + conf . portsShift ) , conf )
2021-05-13 21:21:46 +00:00
2021-07-22 09:46:54 +00:00
if conf . metricsLogging :
startMetricsLog ( )
if conf . metricsServer :
startMetricsServer ( conf . metricsServerAddress ,
Port ( conf . metricsServerPort + conf . portsShift ) )
ok ( true ) # Success
2021-05-28 18:41:29 +00:00
2021-07-22 09:46:54 +00:00
let
conf = WakuNodeConf . load ( )
2021-06-02 07:53:34 +00:00
2021-07-22 09:46:54 +00:00
var
node : WakuNode # This is the node we're going to setup using the conf
2021-05-04 12:11:41 +00:00
2021-07-22 09:46:54 +00:00
##############
# Node setup #
##############
2020-10-09 13:58:50 +00:00
2021-07-22 09:46:54 +00:00
debug " 1/6 Setting up storage "
2021-06-04 10:02:47 +00:00
2021-07-22 09:46:54 +00:00
var
pStorage : WakuPeerStorage
mStorage : WakuMessageStore
2021-04-29 04:54:31 +00:00
2021-07-22 09:46:54 +00:00
let setupStorageRes = setupStorage ( conf )
if setupStorageRes . isErr :
error " 1/6 Setting up storage failed. Continuing without storage. "
else :
( pStorage , mStorage ) = setupStorageRes . get ( )
debug " 2/6 Initializing node "
let initNodeRes = initNode ( conf , pStorage )
2021-04-29 04:54:31 +00:00
2021-07-22 09:46:54 +00:00
if initNodeRes . isErr :
error " 2/6 Initializing node failed. Quitting. "
quit ( QuitFailure )
else :
node = initNodeRes . get ( )
debug " 3/6 Mounting protocols "
2021-04-24 04:56:37 +00:00
2021-07-22 09:46:54 +00:00
let setupProtocolsRes = setupProtocols ( node , conf , mStorage )
2020-09-01 02:09:54 +00:00
2021-07-22 09:46:54 +00:00
if setupProtocolsRes . isErr :
error " 3/6 Mounting protocols failed. Continuing in current state. "
2020-09-01 02:09:54 +00:00
2021-07-22 09:46:54 +00:00
debug " 4/6 Starting node and mounted protocols "
2021-04-15 08:18:14 +00:00
2021-07-22 09:46:54 +00:00
let startNodeRes = startNode ( node , conf )
if startNodeRes . isErr :
error " 4/6 Starting node and mounted protocols failed. Continuing in current state. "
debug " 5/6 Starting monitoring and external interfaces "
let startExternalRes = startExternal ( node , conf )
if startExternalRes . isErr :
error " 5/6 Starting monitoring and external interfaces failed. Continuing in current state. "
debug " 6/6 Setting up shutdown hooks "
# 6/6 Setup graceful shutdown hooks
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
2021-04-15 08:18:14 +00:00
# Handle Ctrl-C SIGINT
proc handleCtrlC ( ) {. noconv . } =
when defined ( windows ) :
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc ( )
info " Shutting down after receiving SIGINT "
waitFor node . stop ( )
quit ( QuitSuccess )
setControlCHook ( handleCtrlC )
# Handle SIGTERM
when defined ( posix ) :
proc handleSigterm ( signal : cint ) {. noconv . } =
info " Shutting down after receiving SIGTERM "
waitFor node . stop ( )
quit ( QuitSuccess )
c_signal ( SIGTERM , handleSigterm )
2021-06-02 07:53:34 +00:00
2021-07-22 09:46:54 +00:00
debug " Node setup complete "
2020-09-01 02:09:54 +00:00
2021-05-28 18:41:29 +00:00
runForever ( )