2021-07-14 19:58:46 +02:00
{. push raises : [ Defect ] . }
2020-04-29 12:49:27 +08:00
import
2022-01-10 15:07:01 +01:00
std / [ hashes , options , tables , strutils , sequtils , os ] ,
2021-06-09 16:37:08 +02:00
chronos , chronicles , metrics ,
stew / shims / net as stewNet ,
2021-09-17 10:31:25 -07:00
stew / byteutils ,
2020-09-01 04:09:54 +02:00
eth / keys ,
2021-08-12 10:51:38 +02:00
eth / p2p / discoveryv5 / enr ,
2020-05-15 12:11:14 +08:00
libp2p / crypto / crypto ,
2021-06-15 10:55:47 +02:00
libp2p / protocols / ping ,
2022-01-10 15:07:01 +01:00
libp2p / protocols / pubsub / [ gossipsub , rpc / messages ] ,
2022-02-16 17:12:09 +01:00
libp2p / nameresolving / nameresolver ,
2022-01-10 15:07:01 +01:00
libp2p / [ builders , multihash ] ,
2021-11-02 10:29:11 +00:00
libp2p / transports / [ transport , tcptransport , wstransport ] ,
2021-07-13 09:18:51 +02:00
.. / protocol / [ waku_relay , waku_message ] ,
2020-11-24 12:34:32 +08:00
.. / protocol / waku_store / waku_store ,
2020-11-23 10:27:45 +08:00
.. / protocol / waku_swap / waku_swap ,
2021-01-05 12:52:10 +08:00
.. / protocol / waku_filter / waku_filter ,
2021-04-24 12:56:37 +08:00
.. / protocol / waku_lightpush / waku_lightpush ,
2021-09-17 10:31:25 -07:00
.. / protocol / waku_rln_relay / [ waku_rln_relay_types ] ,
2021-12-06 20:51:37 +01:00
.. / utils / [ peers , requests , wakuswitch , wakuenr ] ,
2021-06-25 14:06:56 -07:00
. / storage / migration / migration_types ,
2021-08-12 10:51:38 +02:00
. / peer_manager / peer_manager ,
2021-11-01 19:02:39 +01:00
. / dnsdisc / waku_dnsdisc ,
. / discv5 / waku_discv5
2020-10-21 11:54:29 +02:00
2021-07-16 17:13:36 +02:00
export
builders ,
waku_relay , waku_message ,
waku_store ,
waku_swap ,
waku_filter ,
waku_lightpush ,
waku_rln_relay_types
2021-06-08 11:56:32 -07:00
when defined ( rln ) :
2021-07-21 11:37:10 +02:00
import
libp2p / protocols / pubsub / rpc / messages ,
2022-01-21 10:55:36 -08:00
libp2p / protocols / pubsub / pubsub ,
2021-07-21 11:37:10 +02:00
web3 ,
2021-10-19 17:37:29 -07:00
.. / protocol / waku_rln_relay / [ rln , waku_rln_relay_utils ]
2021-06-08 11:56:32 -07:00
2022-01-26 12:02:57 +01:00
declarePublicCounter waku_node_messages , " number of messages received " , [ " type " , " contentTopic " ]
2021-01-29 10:42:41 +02:00
declarePublicGauge waku_node_filters , " number of content filter subscriptions "
declarePublicGauge waku_node_errors , " number of wakunode errors " , [ " type " ]
2022-01-26 12:02:57 +01:00
declarePublicCounter waku_node_conns_initiated , " number of connections initiated by this node " , [ " source " ]
2021-01-29 10:42:41 +02:00
2020-09-16 12:23:10 +08:00
logScope :
topics = " wakunode "
2020-09-11 06:16:45 +02:00
# Default clientId
const clientId * = " Nimbus Waku v2 node "
2020-04-29 12:49:27 +08:00
2021-03-11 09:48:59 +02:00
# Default topic
const defaultTopic = " /waku/2/default-waku/proto "
2021-12-08 18:35:47 +00:00
# Default Waku Filter Timeout
const WakuFilterTimeout : Duration = 1 . days
2021-11-15 13:29:18 +00:00
2020-04-29 12:49:27 +08:00
# key and crypto modules different
type
KeyPair * = crypto . KeyPair
PublicKey * = crypto . PublicKey
PrivateKey * = crypto . PrivateKey
2021-01-06 17:35:05 +08:00
# XXX: Weird type, should probably be using pubsub Topic object name?
Topic * = string
2020-07-29 15:24:01 +02:00
Message * = seq [ byte ]
2021-01-06 17:35:05 +08:00
WakuInfo * = object
# NOTE One for simplicity, can extend later as needed
2021-11-24 15:10:29 +00:00
listenAddresses * : seq [ string ]
2021-01-06 17:35:05 +08:00
#multiaddrStrings*: seq[string]
2020-11-24 12:34:32 +08:00
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode * = ref object of RootObj
2021-02-04 12:32:58 +02:00
peerManager * : PeerManager
2020-11-24 12:34:32 +08:00
switch * : Switch
wakuRelay * : WakuRelay
wakuStore * : WakuStore
wakuFilter * : WakuFilter
wakuSwap * : WakuSwap
2021-02-22 09:40:02 -08:00
wakuRlnRelay * : WakuRLNRelay
2021-04-24 12:56:37 +08:00
wakuLightPush * : WakuLightPush
2021-08-12 10:51:38 +02:00
enr * : enr . Record
2021-06-15 10:55:47 +02:00
libp2pPing * : Ping
2020-11-24 12:34:32 +08:00
filters * : Filters
rng * : ref BrHmacDrbgContext
2021-11-01 19:02:39 +01:00
wakuDiscv5 * : WakuDiscoveryV5
2021-11-15 13:29:18 +00:00
announcedAddresses * : seq [ MultiAddress ]
2021-04-01 11:37:14 +02:00
started * : bool # Indicates that node has started listening
2020-11-24 12:34:32 +08:00
2021-07-27 08:48:56 +02:00
proc protocolMatcher ( codec : string ) : Matcher =
2021-07-02 10:49:41 +02:00
## Returns a protocol matcher function for the provided codec
proc match ( proto : string ) : bool {. gcsafe . } =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto . startsWith ( codec )
2021-07-27 08:48:56 +02:00
return match
2020-11-10 09:13:16 +02:00
proc removeContentFilters ( filters : var Filters , contentFilters : seq [ ContentFilter ] ) {. gcsafe . } =
# Flatten all unsubscribe topics into single seq
2021-05-05 09:34:40 +01:00
let unsubscribeTopics = contentFilters . mapIt ( it . contentTopic )
2020-11-10 09:13:16 +02:00
debug " unsubscribing " , unsubscribeTopics = unsubscribeTopics
var rIdToRemove : seq [ string ] = @ [ ]
for rId , f in filters . mpairs :
# Iterate filter entries to remove matching content topics
2021-05-05 09:34:40 +01:00
2020-11-10 09:13:16 +02:00
# make sure we delete the content filter
# if no more topics are left
2021-05-05 09:34:40 +01:00
f . contentFilters . keepIf ( proc ( cf : auto ) : bool = cf . contentTopic notin unsubscribeTopics )
2020-11-10 09:13:16 +02:00
if f . contentFilters . len = = 0 :
rIdToRemove . add ( rId )
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove :
filters . del ( rId )
debug " filters modified " , filters = filters
2021-11-30 10:12:09 +01:00
proc updateSwitchPeerInfo ( node : WakuNode ) =
## TODO: remove this when supported upstream
##
## nim-libp2p does not yet support announcing addrs
## different from bound addrs.
##
## This is a temporary workaround to replace
## peer info addrs in switch to announced
## addresses.
##
## WARNING: this should only be called once the switch
## has already been started.
if node . announcedAddresses . len > 0 :
node . switch . peerInfo . addrs = node . announcedAddresses
2022-02-18 12:10:38 +01:00
template ip4TcpEndPoint ( address , port ) : MultiAddress =
2020-07-24 09:39:58 +08:00
MultiAddress . init ( address , tcpProtocol , port )
2022-02-18 12:10:38 +01:00
template dns4Ma ( dns4DomainName : string ) : MultiAddress =
MultiAddress . init ( " /dns4/ " & dns4DomainName ) . tryGet ( )
template tcpPortMa ( port : Port ) : MultiAddress =
MultiAddress . init ( " /tcp/ " & $ port ) . tryGet ( )
template dns4TcpEndPoint ( dns4DomainName : string , port : Port ) : MultiAddress =
dns4Ma ( dns4DomainName ) & tcpPortMa ( port )
template wsFlag ( wssEnabled : bool ) : MultiAddress =
2021-12-06 20:51:37 +01:00
if wssEnabled : MultiAddress . init ( " /wss " ) . tryGet ( )
else : MultiAddress . init ( " /ws " ) . tryGet ( )
2020-09-01 04:09:54 +02:00
2021-07-14 19:58:46 +02:00
proc new * ( T : type WakuNode , nodeKey : crypto . PrivateKey ,
2020-09-01 04:09:54 +02:00
bindIp : ValidIpAddress , bindPort : Port ,
2021-03-26 10:49:51 +02:00
extIp = none [ ValidIpAddress ] ( ) , extPort = none [ Port ] ( ) ,
2021-10-12 14:48:48 +02:00
peerStorage : PeerStorage = nil ,
2021-11-02 10:29:11 +00:00
maxConnections = builders . MaxConnections ,
wsBindPort : Port = ( Port ) 8000 ,
2021-11-10 12:05:36 +00:00
wsEnabled : bool = false ,
wssEnabled : bool = false ,
secureKey : string = " " ,
2021-12-06 20:51:37 +01:00
secureCert : string = " " ,
2022-02-16 17:12:09 +01:00
wakuFlags = none ( WakuEnrBitfield ) ,
nameResolver : NameResolver = nil ,
2022-02-18 12:10:38 +01:00
dns4DomainName = none ( string )
2021-12-06 20:51:37 +01:00
) : T
{. raises : [ Defect , LPError , IOError , TLSStreamProtocolError ] . } =
2020-10-06 11:33:28 +08:00
## Creates a Waku Node.
##
## Status: Implemented.
##
2021-12-06 20:51:37 +01:00
## Initialize addresses
2020-04-29 12:49:27 +08:00
let
2021-12-06 20:51:37 +01:00
# Bind addresses
2022-02-18 12:10:38 +01:00
hostAddress = ip4TcpEndPoint ( bindIp , bindPort )
wsHostAddress = if wsEnabled or wssEnabled : some ( ip4TcpEndPoint ( bindIp , wsbindPort ) & wsFlag ( wssEnabled ) )
2021-12-06 20:51:37 +01:00
else : none ( MultiAddress )
2022-02-18 12:10:38 +01:00
# Setup external addresses, if available
var
hostExtAddress , wsExtAddress = none ( MultiAddress )
if ( dns4DomainName . isSome ( ) ) :
# Use dns4 for externally announced addresses
hostExtAddress = some ( dns4TcpEndPoint ( dns4DomainName . get ( ) , extPort . get ( ) ) )
if ( wsHostAddress . isSome ( ) ) :
wsExtAddress = some ( dns4TcpEndPoint ( dns4DomainName . get ( ) , wsBindPort ) & wsFlag ( wssEnabled ) )
else :
# No public domain name, use ext IP if available
if extIp . isSome ( ) and extPort . isSome ( ) :
hostExtAddress = some ( ip4TcpEndPoint ( extIp . get ( ) , extPort . get ( ) ) )
if ( wsHostAddress . isSome ( ) ) :
wsExtAddress = some ( ip4TcpEndPoint ( extIp . get ( ) , wsBindPort ) & wsFlag ( wssEnabled ) )
2021-12-06 20:51:37 +01:00
var announcedAddresses : seq [ MultiAddress ]
if hostExtAddress . isSome :
announcedAddresses . add ( hostExtAddress . get ( ) )
else :
announcedAddresses . add ( hostAddress ) # We always have at least a bind address for the host
if wsExtAddress . isSome :
announcedAddresses . add ( wsExtAddress . get ( ) )
elif wsHostAddress . isSome :
announcedAddresses . add ( wsHostAddress . get ( ) )
## Initialize peer
let
rng = crypto . newRng ( )
2021-08-12 10:51:38 +02:00
enrIp = if extIp . isSome ( ) : extIp
else : some ( bindIp )
enrTcpPort = if extPort . isSome ( ) : extPort
else : some ( bindPort )
2021-12-06 20:51:37 +01:00
enrMultiaddrs = if wsExtAddress . isSome : @ [ wsExtAddress . get ( ) ] # Only add ws/wss to `multiaddrs` field
elif wsHostAddress . isSome : @ [ wsHostAddress . get ( ) ]
else : @ [ ]
enr = initEnr ( nodeKey ,
enrIp ,
enrTcpPort , none ( Port ) ,
wakuFlags ,
enrMultiaddrs )
2021-08-12 10:51:38 +02:00
2022-01-10 16:07:35 +01:00
info " Initializing networking " , addrs = announcedAddresses
2021-11-10 12:05:36 +00:00
2021-11-02 10:29:11 +00:00
var switch = newWakuSwitch ( some ( nodekey ) ,
2021-12-06 20:51:37 +01:00
hostAddress ,
wsHostAddress ,
transportFlags = { ServerFlags . ReuseAddr } ,
rng = rng ,
maxConnections = maxConnections ,
wssEnabled = wssEnabled ,
secureKeyPath = secureKey ,
2022-02-16 17:12:09 +01:00
secureCertPath = secureCert ,
nameResolver = nameResolver )
2021-11-02 10:29:11 +00:00
2021-07-14 19:58:46 +02:00
let wakuNode = WakuNode (
2021-03-26 10:49:51 +02:00
peerManager : PeerManager . new ( switch , peerStorage ) ,
2020-09-28 23:44:14 +02:00
switch : switch ,
2022-01-10 16:07:35 +01:00
rng : rng ,
2021-08-12 10:51:38 +02:00
enr : enr ,
2021-11-15 13:29:18 +00:00
filters : initTable [ string , Filter ] ( ) ,
announcedAddresses : announcedAddresses
2020-09-17 22:10:41 +02:00
)
2020-09-11 13:28:27 +02:00
2021-07-14 19:58:46 +02:00
return wakuNode
2021-04-13 07:56:49 +02:00
proc subscribe ( node : WakuNode , topic : Topic , handler : Option [ TopicHandler ] ) =
2021-04-29 06:54:31 +02:00
if node . wakuRelay . isNil :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2021-04-13 07:56:49 +02:00
info " subscribe " , topic = topic
proc defaultHandler ( topic : string , data : seq [ byte ] ) {. async , gcsafe . } =
# A default handler should be registered for all topics
trace " Hit default handler " , topic = topic , data = data
let msg = WakuMessage . init ( data )
if msg . isOk ( ) :
2021-07-13 09:18:51 +02:00
# Notify mounted protocols of new message
if ( not node . wakuFilter . isNil ) :
await node . wakuFilter . handleMessage ( topic , msg . value ( ) )
if ( not node . wakuStore . isNil ) :
await node . wakuStore . handleMessage ( topic , msg . value ( ) )
2022-01-26 12:02:57 +01:00
# Increase message counter
let ctLabel = if msg . value ( ) . contentTopic . len > 0 : msg . value ( ) . contentTopic
else : " none "
waku_node_messages . inc ( labelValues = [ " relay " , ctLabel ] )
2021-04-13 07:56:49 +02:00
let wakuRelay = node . wakuRelay
if topic notin PubSub ( wakuRelay ) . topics :
# Add default handler only for new topics
debug " Registering default handler " , topic = topic
wakuRelay . subscribe ( topic , defaultHandler )
if handler . isSome :
debug " Registering handler " , topic = topic
wakuRelay . subscribe ( topic , handler . get ( ) )
2021-02-02 13:33:59 +02:00
proc subscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-07-27 17:01:06 +08:00
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
2020-08-27 18:15:46 +08:00
## this topic. TopicHandler is a method that takes a topic and some data.
2020-07-27 17:01:06 +08:00
##
2020-08-27 18:15:46 +08:00
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
2021-04-13 07:56:49 +02:00
node . subscribe ( topic , some ( handler ) )
2020-07-27 17:01:06 +08:00
2020-10-02 14:48:56 +02:00
proc subscribe * ( node : WakuNode , request : FilterRequest , handler : ContentFilterHandler ) {. async , gcsafe . } =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
2020-11-10 09:13:16 +02:00
# Sanity check for well-formed subscribe FilterRequest
doAssert ( request . subscribe , " invalid subscribe request " )
2020-10-02 14:48:56 +02:00
info " subscribe content " , filter = request
2020-07-27 17:01:06 +08:00
2020-10-20 04:36:27 +02:00
var id = generateRequestId ( node . rng )
2021-02-08 11:17:20 +02:00
2020-10-20 04:36:27 +02:00
if node . wakuFilter . isNil = = false :
2021-02-08 11:17:20 +02:00
let idOpt = await node . wakuFilter . subscribe ( request )
2020-07-27 17:01:06 +08:00
2021-02-08 11:17:20 +02:00
if idOpt . isSome ( ) :
# Subscribed successfully.
id = idOpt . get ( )
else :
# Failed to subscribe
error " remote subscription to filter failed " , filter = request
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
# Register handler for filter, whether remote subscription succeeded or not
node . filters [ id ] = Filter ( contentFilters : request . contentFilters , handler : handler )
2021-01-29 10:42:41 +02:00
waku_node_filters . set ( node . filters . len . int64 )
2021-02-02 13:33:59 +02:00
proc unsubscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-10-27 03:13:56 +02:00
## Unsubscribes a handler from a PubSub topic.
2020-07-27 17:01:06 +08:00
##
2020-10-27 03:13:56 +02:00
## Status: Implemented.
2021-04-29 06:54:31 +02:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 03:13:56 +02:00
info " unsubscribe " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 13:33:59 +02:00
wakuRelay . unsubscribe ( @ [ ( topic , handler ) ] )
2020-10-27 03:13:56 +02:00
2021-02-02 13:33:59 +02:00
proc unsubscribeAll * ( node : WakuNode , topic : Topic ) =
2020-10-27 03:13:56 +02:00
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
2021-04-29 06:54:31 +02:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribeAll`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 03:13:56 +02:00
info " unsubscribeAll " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 13:33:59 +02:00
wakuRelay . unsubscribeAll ( topic )
2020-10-27 03:13:56 +02:00
2020-07-27 17:01:06 +08:00
2020-11-10 09:13:16 +02:00
proc unsubscribe * ( node : WakuNode , request : FilterRequest ) {. async , gcsafe . } =
2020-07-27 17:01:06 +08:00
## Unsubscribe from a content filter.
##
2020-11-10 09:13:16 +02:00
## Status: Implemented.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert ( request . subscribe = = false , " invalid unsubscribe request " )
info " unsubscribe content " , filter = request
await node . wakuFilter . unsubscribe ( request )
node . filters . removeContentFilters ( request . contentFilters )
2020-09-01 23:20:38 +08:00
2021-01-29 10:42:41 +02:00
waku_node_filters . set ( node . filters . len . int64 )
2020-11-16 17:55:49 +08:00
2021-10-19 17:37:29 -07:00
proc publish * ( node : WakuNode , topic : Topic , message : WakuMessage ) {. async , gcsafe . } =
2020-09-01 23:20:38 +08:00
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
2020-07-27 17:01:06 +08:00
##
2020-09-01 23:20:38 +08:00
## Status: Implemented.
2021-10-19 17:37:29 -07:00
2021-04-29 06:54:31 +02:00
if node . wakuRelay . isNil :
error " Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead. "
# @TODO improved error handling
return
2020-07-27 17:01:06 +08:00
2020-09-16 12:23:10 +08:00
let wakuRelay = node . wakuRelay
2020-09-02 05:15:25 +02:00
debug " publish " , topic = topic , contentTopic = message . contentTopic
2021-03-16 11:18:40 -07:00
var publishingMessage = message
2020-09-02 05:15:25 +02:00
let data = message . encode ( ) . buffer
2020-12-02 10:40:53 +02:00
discard await wakuRelay . publish ( topic , data )
2020-07-27 17:01:06 +08:00
2021-04-24 12:56:37 +08:00
proc lightpush * ( node : WakuNode , topic : Topic , message : WakuMessage , handler : PushResponseHandler ) {. async , gcsafe . } =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality. This field may be also be omitted.
##
## Status: Implemented.
debug " Publishing with lightpush " , topic = topic , contentTopic = message . contentTopic
let rpc = PushRequest ( pubSubTopic : topic , message : message )
await node . wakuLightPush . request ( rpc , handler )
2020-10-06 11:33:28 +08:00
proc query * ( node : WakuNode , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
2020-09-24 04:16:25 +02:00
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse.
2020-07-27 17:01:06 +08:00
##
2020-09-24 04:16:25 +02:00
## Status: Implemented.
2020-10-06 11:33:28 +08:00
2020-11-24 12:53:42 +08:00
# TODO Once waku swap is less experimental, this can simplified
2020-11-21 13:31:48 +08:00
if node . wakuSwap . isNil :
debug " Using default query "
await node . wakuStore . query ( query , handler )
else :
2020-11-16 17:55:49 +08:00
debug " Using SWAPAccounting query "
2020-11-24 12:53:42 +08:00
# TODO wakuSwap now part of wakuStore object
await node . wakuStore . queryWithAccounting ( query , handler )
2020-11-16 17:55:49 +08:00
2021-10-06 14:29:08 +02:00
proc resume * ( node : WakuNode , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) ) {. async , gcsafe . } =
2021-05-26 12:33:22 -07:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if not node . wakuStore . isNil :
let retrievedMessages = await node . wakuStore . resume ( peerList )
if retrievedMessages . isOk :
info " the number of retrieved messages since the last online time: " , number = retrievedMessages . value
2020-10-06 11:33:28 +08:00
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info * ( node : WakuNode ) : WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
##
## Status: Implemented.
##
2022-01-10 16:07:35 +01:00
let peerInfo = node . switch . peerInfo
2021-12-06 20:51:37 +01:00
2021-11-24 15:10:29 +00:00
var listenStr : seq [ string ]
for address in node . announcedAddresses :
var fulladdr = $ address & " /p2p/ " & $ peerInfo . peerId
listenStr & = fulladdr
let wakuInfo = WakuInfo ( listenAddresses : listenStr )
2020-10-06 11:33:28 +08:00
return wakuInfo
2020-07-27 17:01:06 +08:00
2021-12-08 18:35:47 +00:00
proc mountFilter * ( node : WakuNode , filterTimeout : Duration = WakuFilterTimeout ) {. raises : [ Defect , KeyError , LPError ] } =
2020-10-20 04:36:27 +02:00
info " mounting filter "
2021-07-14 19:58:46 +02:00
proc filterHandler ( requestId : string , msg : MessagePush )
{. gcsafe , raises : [ Defect , KeyError ] . } =
2020-10-20 04:36:27 +02:00
info " push received "
for message in msg . messages :
2021-04-22 15:45:13 +02:00
node . filters . notify ( message , requestId ) # Trigger filter handlers on a light node
2022-01-26 12:02:57 +01:00
# Increase message counter
let ctLabel = if message . contentTopic . len > 0 : message . contentTopic
else : " none "
waku_node_messages . inc ( labelValues = [ " filter " , ctLabel ] )
2020-10-20 04:36:27 +02:00
2021-12-08 18:35:47 +00:00
node . wakuFilter = WakuFilter . init ( node . peerManager , node . rng , filterHandler , filterTimeout )
2021-07-02 10:49:41 +02:00
node . switch . mount ( node . wakuFilter , protocolMatcher ( WakuFilterCodec ) )
2020-10-20 04:36:27 +02:00
2020-11-24 12:53:42 +08:00
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
2021-07-14 19:58:46 +02:00
proc mountSwap * ( node : WakuNode , swapConfig : SwapConfig = SwapConfig . init ( ) ) {. raises : [ Defect , LPError ] . } =
2021-06-15 03:06:36 +01:00
info " mounting swap " , mode = $ swapConfig . mode
node . wakuSwap = WakuSwap . init ( node . peerManager , node . rng , swapConfig )
2021-07-02 10:49:41 +02:00
node . switch . mount ( node . wakuSwap , protocolMatcher ( WakuSwapCodec ) )
2020-11-18 20:45:51 +08:00
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
2021-11-03 11:59:51 +01:00
proc mountStore * ( node : WakuNode , store : MessageStore = nil , persistMessages : bool = false , capacity = DefaultStoreCapacity ) {. raises : [ Defect , LPError ] . } =
2020-11-24 12:53:42 +08:00
info " mounting store "
if node . wakuSwap . isNil :
debug " mounting store without swap "
2021-11-03 11:59:51 +01:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store , persistMessages = persistMessages , capacity = capacity )
2020-11-24 12:53:42 +08:00
else :
debug " mounting store with swap "
2021-11-03 11:59:51 +01:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store , node . wakuSwap , persistMessages = persistMessages , capacity = capacity )
2020-11-24 12:53:42 +08:00
2021-07-02 10:49:41 +02:00
node . switch . mount ( node . wakuStore , protocolMatcher ( WakuStoreCodec ) )
2021-09-24 10:32:45 -07:00
2021-06-08 11:56:32 -07:00
when defined ( rln ) :
2022-02-16 14:52:21 -08:00
proc addRLNRelayValidator * ( node : WakuNode , pubsubTopic : string , contentTopic : ContentTopic , spamHandler : Option [ SpamHandler ] = none ( SpamHandler ) ) =
2021-09-24 10:32:45 -07:00
## this procedure is a thin wrapper for the pubsub addValidator method
2022-02-04 15:58:27 -08:00
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
## the message validation logic is according to https://rfc.vac.dev/spec/17/
2022-01-21 10:55:36 -08:00
proc validator ( topic : string , message : messages . Message ) : Future [ pubsub . ValidationResult ] {. async . } =
2022-02-16 14:52:21 -08:00
debug " rln-relay topic validator is called "
2021-09-24 10:32:45 -07:00
let msg = WakuMessage . init ( message . data )
if msg . isOk ( ) :
2022-02-04 15:58:27 -08:00
let wakumessage = msg . value ( )
# check the contentTopic
if ( wakumessage . contentTopic ! = " " ) and ( contentTopic ! = " " ) and ( wakumessage . contentTopic ! = contentTopic ) :
2022-02-16 14:52:21 -08:00
debug " content topic did not match: " , contentTopic = wakumessage . contentTopic , payload = string . fromBytes ( wakumessage . payload )
2022-02-04 15:58:27 -08:00
return pubsub . ValidationResult . Accept
# validate the message
let validationRes = node . wakuRlnRelay . validateMessage ( wakumessage )
2021-11-23 14:48:40 -08:00
case validationRes :
of Valid :
2022-02-16 14:52:21 -08:00
debug " message validity is verified, relaying: " , wakumessage = wakumessage , payload = string . fromBytes ( wakumessage . payload )
2022-01-21 10:55:36 -08:00
return pubsub . ValidationResult . Accept
2021-11-23 14:48:40 -08:00
of Invalid :
2022-02-16 14:52:21 -08:00
debug " message validity could not be verified, discarding: " , wakumessage = wakumessage , payload = string . fromBytes ( wakumessage . payload )
2022-01-21 10:55:36 -08:00
return pubsub . ValidationResult . Reject
2021-11-23 14:48:40 -08:00
of Spam :
2022-02-16 14:52:21 -08:00
debug " A spam message is found! yay! discarding: " , wakumessage = wakumessage , payload = string . fromBytes ( wakumessage . payload )
if spamHandler . isSome :
let handler = spamHandler . get
handler ( wakumessage )
2022-01-21 10:55:36 -08:00
return pubsub . ValidationResult . Reject
2021-10-19 17:37:29 -07:00
# set a validator for the supplied pubsubTopic
2021-09-24 10:32:45 -07:00
let pb = PubSub ( node . wakuRelay )
pb . addValidator ( pubsubTopic , validator )
2021-07-14 19:58:46 +02:00
proc mountRlnRelay * ( node : WakuNode ,
2021-08-26 16:14:51 -07:00
ethClientAddrOpt : Option [ string ] = none ( string ) ,
2022-01-21 10:55:36 -08:00
ethAccAddrOpt : Option [ web3 . Address ] = none ( web3 . Address ) ,
memContractAddOpt : Option [ web3 . Address ] = none ( web3 . Address ) ,
2021-08-26 16:14:51 -07:00
groupOpt : Option [ seq [ IDCommitment ] ] = none ( seq [ IDCommitment ] ) ,
memKeyPairOpt : Option [ MembershipKeyPair ] = none ( MembershipKeyPair ) ,
2021-09-24 10:32:45 -07:00
memIndexOpt : Option [ MembershipIndex ] = none ( MembershipIndex ) ,
onchainMode : bool = true ,
2022-02-04 15:58:27 -08:00
pubsubTopic : string ,
2022-02-16 14:52:21 -08:00
contentTopic : ContentTopic ,
spamHandler : Option [ SpamHandler ] = none ( SpamHandler ) ) {. async . } =
2021-06-08 11:56:32 -07:00
# TODO return a bool value to indicate the success of the call
# check whether inputs are provided
2021-09-24 10:32:45 -07:00
# relay protocol is the prerequisite of rln-relay
if node . wakuRelay . isNil :
error " Failed to mount WakuRLNRelay. Relay protocol is not mounted. "
return
# check whether the pubsub topic is supported at the relay level
if pubsubTopic notin node . wakuRelay . defaultTopics :
error " Failed to mount WakuRLNRelay. The relay protocol does not support the configured pubsub topic. " , pubsubTopic = pubsubTopic
return
2021-09-17 10:31:25 -07:00
if onchainMode :
if memContractAddOpt . isNone ( ) :
error " failed to mount rln relay: membership contract address is not provided "
return
if ethClientAddrOpt . isNone ( ) :
error " failed to mount rln relay: Ethereum client address is not provided "
return
if ethAccAddrOpt . isNone ( ) :
error " failed to mount rln relay: Ethereum account address is not provided "
return
else :
if groupOpt . isNone ( ) :
error " failed to mount rln relay: group information is not provided "
return
2021-08-26 16:14:51 -07:00
if memKeyPairOpt . isNone ( ) :
2021-09-17 10:31:25 -07:00
error " failed to mount rln relay: membership key of the node is not provided "
2021-08-26 16:14:51 -07:00
return
if memIndexOpt . isNone ( ) :
2021-09-17 10:31:25 -07:00
error " failed to mount rln relay: membership index is not provided "
2021-08-26 16:14:51 -07:00
return
2021-09-17 10:31:25 -07:00
var
ethClientAddr : string
2022-01-21 10:55:36 -08:00
ethAccAddr : web3 . Address
memContractAdd : web3 . Address
2021-09-17 10:31:25 -07:00
if onchainMode :
2021-08-26 16:14:51 -07:00
ethClientAddr = ethClientAddrOpt . get ( )
ethAccAddr = ethAccAddrOpt . get ( )
memContractAdd = memContractAddOpt . get ( )
2021-09-17 10:31:25 -07:00
let
2021-08-26 16:14:51 -07:00
group = groupOpt . get ( )
memKeyPair = memKeyPairOpt . get ( )
memIndex = memIndexOpt . get ( )
# check the peer's index and the inclusion of user's identity commitment in the group
doAssert ( ( memKeyPair . idCommitment ) = = group [ int ( memIndex ) ] )
2021-06-08 11:56:32 -07:00
# create an RLN instance
2021-09-17 10:31:25 -07:00
var rlnInstance = createRLNInstance ( )
2021-08-24 12:25:29 -07:00
doAssert ( rlnInstance . isOk )
2021-08-26 16:14:51 -07:00
var rln = rlnInstance . value
# generate the membership keys if none is provided
2021-09-17 10:31:25 -07:00
# in a happy path, this condition never gets through for a static group of users
2021-08-26 16:14:51 -07:00
# the node should pass its keys i.e., memKeyPairOpt to the function
if not memKeyPairOpt . isSome :
let membershipKeyPair = rln . membershipKeyGen ( )
# check whether keys are generated
doAssert ( membershipKeyPair . isSome ( ) )
debug " the membership key for the rln relay is generated " , idKey = membershipKeyPair . get ( ) . idKey . toHex , idCommitment = membershipKeyPair . get ( ) . idCommitment . toHex
# add members to the Merkle tree
for index in 0 .. group . len - 1 :
let member = group [ index ]
let member_is_added = rln . insertMember ( member )
doAssert ( member_is_added )
2021-10-19 17:37:29 -07:00
2021-08-26 16:14:51 -07:00
# create the WakuRLNRelay
var rlnPeer = WakuRLNRelay ( membershipKeyPair : memKeyPair ,
membershipIndex : memIndex ,
membershipContractAddress : memContractAdd ,
ethClientAddress : ethClientAddr ,
ethAccountAddress : ethAccAddr ,
2021-10-19 17:37:29 -07:00
rlnInstance : rln ,
2022-02-04 15:58:27 -08:00
pubsubTopic : pubsubTopic ,
contentTopic : contentTopic )
2021-10-19 17:37:29 -07:00
2021-09-17 10:31:25 -07:00
if onchainMode :
# register the rln-relay peer to the membership contract
let is_successful = await rlnPeer . register ( )
# check whether registration is done
doAssert ( is_successful )
debug " peer is successfully registered into the membership contract "
2021-06-08 11:56:32 -07:00
2021-09-24 10:32:45 -07:00
# adds a topic validator for the supplied pubsub topic at the relay protocol
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
# the topic validator checks for the correct non-spamming proof of the message
2022-02-16 14:52:21 -08:00
addRLNRelayValidator ( node , pubsubTopic , contentTopic , spamHandler )
debug " rln relay topic validator is mounted successfully " , pubsubTopic = pubsubTopic , contentTopic = contentTopic
2021-09-24 10:32:45 -07:00
2021-06-08 11:56:32 -07:00
node . wakuRlnRelay = rlnPeer
2021-07-21 11:37:10 +02:00
2021-07-22 11:46:54 +02:00
proc startRelay * ( node : WakuNode ) {. async . } =
if node . wakuRelay . isNil :
trace " Failed to start relay. Not mounted. "
return
## Setup and start relay protocol
info " starting relay "
# Topic subscriptions
for topic in node . wakuRelay . defaultTopics :
node . subscribe ( topic , none ( TopicHandler ) )
# Resume previous relay connections
2021-07-27 08:48:56 +02:00
if node . peerManager . hasPeers ( protocolMatcher ( WakuRelayCodec ) ) :
2021-07-22 11:46:54 +02:00
info " Found previous WakuRelay peers. Reconnecting. "
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node . wakuRelay . parameters . pruneBackoff + chronos . seconds ( BackoffSlackTime )
await node . peerManager . reconnectPeers ( WakuRelayCodec ,
2021-07-27 08:48:56 +02:00
protocolMatcher ( WakuRelayCodec ) ,
2021-07-22 11:46:54 +02:00
backoffPeriod )
# Start the WakuRelay protocol
await node . wakuRelay . start ( )
info " relay started successfully "
2021-05-04 14:11:41 +02:00
proc mountRelay * ( node : WakuNode ,
topics : seq [ string ] = newSeq [ string ] ( ) ,
2021-05-24 13:19:33 +02:00
relayMessages = true ,
2021-07-14 19:58:46 +02:00
triggerSelf = true )
# @TODO: Better error handling: CatchableError is raised by `waitFor`
2022-01-10 15:07:01 +01:00
{. gcsafe , raises : [ Defect , InitializationError , LPError , CatchableError ] . } =
func msgIdProvider ( m : messages . Message ) : seq [ byte ] =
let mh = MultiHash . digest ( " sha2-256 " , m . data )
if mh . isOk ( ) :
return mh [ ] . data . buffer
else :
return ( $ m . data . hash ) . toBytes ( )
2021-07-14 19:58:46 +02:00
2020-10-20 04:36:27 +02:00
let wakuRelay = WakuRelay . init (
switch = node . switch ,
2022-01-10 15:07:01 +01:00
msgIdProvider = msgIdProvider ,
2021-05-24 13:19:33 +02:00
triggerSelf = triggerSelf ,
2020-10-20 04:36:27 +02:00
sign = false ,
2022-01-06 13:42:37 +01:00
verifySignature = false ,
maxMessageSize = MaxWakuMessageSize
2020-10-20 04:36:27 +02:00
)
2021-05-04 14:11:41 +02:00
2021-09-24 10:32:45 -07:00
info " mounting relay " , relayMessages = relayMessages
2020-10-20 04:36:27 +02:00
2021-07-22 11:46:54 +02:00
## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay . defaultTopics = concat ( @ [ defaultTopic ] , topics )
2021-07-02 10:49:41 +02:00
node . switch . mount ( wakuRelay , protocolMatcher ( WakuRelayCodec ) )
2020-10-20 04:36:27 +02:00
2021-07-22 11:46:54 +02:00
if relayMessages :
2021-05-04 14:11:41 +02:00
## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes).
## All nodes, however, currently require WakuRelay, regardless of desired capabilities.
## This is to allow protocol stream negotation with relay-capable nodes to succeed.
## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub
## topics. We also never start the relay protocol. node.wakuRelay remains nil.
2021-07-22 11:46:54 +02:00
## @TODO: in future, this WakuRelay dependency will be removed completely
node . wakuRelay = wakuRelay
2021-09-17 10:31:25 -07:00
2021-07-22 11:46:54 +02:00
info " relay mounted successfully "
2021-04-01 11:37:14 +02:00
2021-07-22 11:46:54 +02:00
if node . started :
# Node has started already. Let's start relay too.
waitFor node . startRelay ( )
2021-02-22 09:40:02 -08:00
2021-07-14 19:58:46 +02:00
proc mountLightPush * ( node : WakuNode ) {. raises : [ Defect , LPError ] . } =
2021-04-24 12:56:37 +08:00
info " mounting light push "
if node . wakuRelay . isNil :
debug " mounting lightpush without relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil )
else :
debug " mounting lightpush with relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil , node . wakuRelay )
2021-07-02 10:49:41 +02:00
node . switch . mount ( node . wakuLightPush , protocolMatcher ( WakuLightPushCodec ) )
2021-04-24 12:56:37 +08:00
2021-07-14 19:58:46 +02:00
proc mountLibp2pPing * ( node : WakuNode ) {. raises : [ Defect , LPError ] . } =
2021-06-15 10:55:47 +02:00
info " mounting libp2p ping protocol "
2021-06-02 09:53:34 +02:00
2021-07-14 19:58:46 +02:00
try :
node . libp2pPing = Ping . new ( rng = node . rng )
except Exception as e :
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module
2021-07-22 11:46:54 +02:00
raise newException ( LPError , " Failed to initialize ping protocol " )
2021-06-02 09:53:34 +02:00
2021-06-15 10:55:47 +02:00
node . switch . mount ( node . libp2pPing )
2021-06-02 09:53:34 +02:00
proc keepaliveLoop ( node : WakuNode , keepalive : chronos . Duration ) {. async . } =
while node . started :
2021-06-15 10:55:47 +02:00
# Keep all connected peers alive while running
2021-06-02 09:53:34 +02:00
trace " Running keepalive "
2021-06-15 10:55:47 +02:00
# First get a list of connected peer infos
let peers = node . peerManager . peers ( )
. filterIt ( node . peerManager . connectedness ( it . peerId ) = = Connected )
2021-10-06 14:29:08 +02:00
. mapIt ( it . toRemotePeerInfo ( ) )
2021-06-15 10:55:47 +02:00
# Attempt to retrieve and ping the active outgoing connection for each peer
for peer in peers :
let connOpt = await node . peerManager . dialPeer ( peer , PingCodec )
if connOpt . isNone :
# @TODO more sophisticated error handling here
debug " failed to connect to remote peer " , peer = peer
waku_node_errors . inc ( labelValues = [ " keep_alive_failure " ] )
return
discard await node . libp2pPing . ping ( connOpt . get ( ) ) # Ping connection
2021-06-02 09:53:34 +02:00
await sleepAsync ( keepalive )
proc startKeepalive * ( node : WakuNode ) =
let defaultKeepalive = 5 . minutes # 50% of the default chronosstream timeout duration
info " starting keepalive " , keepalive = defaultKeepalive
asyncSpawn node . keepaliveLoop ( defaultKeepalive )
2020-10-21 11:54:29 +02:00
## Helpers
2022-01-26 12:02:57 +01:00
proc connectToNode ( n : WakuNode , remotePeer : RemotePeerInfo , source = " api " ) {. async . } =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info " Connecting to node " , remotePeer = remotePeer , source = source
2020-10-21 11:54:29 +02:00
# NOTE This is dialing on WakuRelay protocol specifically
2022-01-26 12:02:57 +01:00
info " Attempting dial " , wireAddr = remotePeer . addrs [ 0 ] , peerId = remotePeer . peerId
let connOpt = await n . peerManager . dialPeer ( remotePeer , WakuRelayCodec )
if connOpt . isSome ( ) :
info " Successfully connected to peer " , wireAddr = remotePeer . addrs [ 0 ] , peerId = remotePeer . peerId
waku_node_conns_initiated . inc ( labelValues = [ source ] )
else :
error " Failed to connect to peer " , wireAddr = remotePeer . addrs [ 0 ] , peerId = remotePeer . peerId
waku_node_errors . inc ( labelValues = [ " conn_init_failure " ] )
2020-09-11 06:16:45 +02:00
2021-07-14 19:58:46 +02:00
proc setStorePeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 12:02:47 +02:00
info " Set store peer " , address = address
2020-09-11 06:16:45 +02:00
2021-10-06 14:29:08 +02:00
let remotePeer = parseRemotePeerInfo ( address )
2020-09-25 13:35:32 +02:00
2020-10-21 11:54:29 +02:00
n . wakuStore . setPeer ( remotePeer )
2020-09-25 13:35:32 +02:00
2021-07-14 19:58:46 +02:00
proc setFilterPeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 12:02:47 +02:00
info " Set filter peer " , address = address
2020-09-25 13:35:32 +02:00
2021-10-06 14:29:08 +02:00
let remotePeer = parseRemotePeerInfo ( address )
2020-10-09 15:58:50 +02:00
2020-10-21 11:54:29 +02:00
n . wakuFilter . setPeer ( remotePeer )
2020-10-09 15:58:50 +02:00
2021-07-14 19:58:46 +02:00
proc setLightPushPeer * ( n : WakuNode , address : string ) {. raises : [ Defect , ValueError , LPError ] . } =
2021-06-04 12:02:47 +02:00
info " Set lightpush peer " , address = address
2021-10-06 14:29:08 +02:00
let remotePeer = parseRemotePeerInfo ( address )
2021-06-04 12:02:47 +02:00
n . wakuLightPush . setPeer ( remotePeer )
2022-01-26 12:02:57 +01:00
proc connectToNodes * ( n : WakuNode , nodes : seq [ string ] , source = " api " ) {. async . } =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info " connectToNodes " , len = nodes . len
2020-10-21 11:54:29 +02:00
for nodeId in nodes :
2022-01-26 12:02:57 +01:00
await connectToNode ( n , parseRemotePeerInfo ( nodeId ) , source )
2020-10-22 13:12:00 +02:00
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
2022-01-26 12:02:57 +01:00
proc connectToNodes * ( n : WakuNode , nodes : seq [ RemotePeerInfo ] , source = " api " ) {. async . } =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info " connectToNodes " , len = nodes . len
2021-10-06 14:29:08 +02:00
for remotePeerInfo in nodes :
2022-01-26 12:02:57 +01:00
await connectToNode ( n , remotePeerInfo , source )
2020-10-22 13:12:00 +02:00
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
2020-10-09 15:58:50 +02:00
2021-11-01 19:02:39 +01:00
proc runDiscv5Loop ( node : WakuNode ) {. async . } =
## Continuously add newly discovered nodes
## using Node Discovery v5
if ( node . wakuDiscv5 . isNil ) :
warn " Trying to run discovery v5 while it ' s disabled "
return
info " Starting discovery loop "
while node . wakuDiscv5 . listening :
trace " Running discovery loop "
## Query for a random target and collect all discovered nodes
## @TODO: we could filter nodes here
let discoveredPeers = await node . wakuDiscv5 . findRandomPeers ( )
if discoveredPeers . isOk :
## Let's attempt to connect to peers we
## have not encountered before
trace " Discovered peers " , count = discoveredPeers . get ( ) . len ( )
let newPeers = discoveredPeers . get ( ) . filterIt (
not node . switch . peerStore . addressBook . contains ( it . peerId ) )
if newPeers . len > 0 :
debug " Connecting to newly discovered peers " , count = newPeers . len ( )
2022-01-26 12:02:57 +01:00
await connectToNodes ( node , newPeers , " discv5 " )
2021-11-01 19:02:39 +01:00
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
#
# Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync ( 5 . seconds )
proc startDiscv5 * ( node : WakuNode ) : Future [ bool ] {. async . } =
## Start Discovery v5 service
info " Starting discovery v5 service "
if not node . wakuDiscv5 . isNil :
## First start listening on configured port
try :
trace " Start listening on discv5 port "
node . wakuDiscv5 . open ( )
except CatchableError :
error " Failed to start discovery service. UDP port may be already in use "
return false
## Start Discovery v5
trace " Start discv5 service "
node . wakuDiscv5 . start ( )
trace " Start discovering new peers using discv5 "
asyncSpawn node . runDiscv5Loop ( )
debug " Successfully started discovery v5 service "
return true
return false
proc stopDiscv5 * ( node : WakuNode ) : Future [ bool ] {. async . } =
## Stop Discovery v5 service
if not node . wakuDiscv5 . isNil :
info " Stopping discovery v5 service "
## Stop Discovery v5 process and close listening port
if node . wakuDiscv5 . listening :
trace " Stop listening on discv5 port "
await node . wakuDiscv5 . closeWait ( )
debug " Successfully stopped discovery v5 service "
2021-07-22 11:46:54 +02:00
proc start * ( node : WakuNode ) {. async . } =
## Starts a created Waku Node and
## all its mounted protocols.
##
## Status: Implemented.
2022-01-14 10:25:01 +01:00
await node . switch . start ( )
2021-07-22 11:46:54 +02:00
# TODO Get this from WakuNode obj
2022-01-10 16:07:35 +01:00
let peerInfo = node . switch . peerInfo
2021-07-22 11:46:54 +02:00
info " PeerInfo " , peerId = peerInfo . peerId , addrs = peerInfo . addrs
2021-11-15 13:29:18 +00:00
var listenStr = " "
for address in node . announcedAddresses :
var fulladdr = " [ " & $ address & " /p2p/ " & $ peerInfo . peerId & " ] "
listenStr & = fulladdr
2021-07-22 11:46:54 +02:00
## XXX: this should be /ip4..., / stripped?
info " Listening on " , full = listenStr
2021-08-12 10:51:38 +02:00
info " Discoverable ENR " , enr = node . enr . toURI ( )
2021-07-22 11:46:54 +02:00
2021-11-30 10:12:09 +01:00
## Update switch peer info with announced addrs
node . updateSwitchPeerInfo ( )
2021-07-22 11:46:54 +02:00
if not node . wakuRelay . isNil :
await node . startRelay ( )
info " Node started successfully "
node . started = true
proc stop * ( node : WakuNode ) {. async . } =
if not node . wakuRelay . isNil :
await node . wakuRelay . stop ( )
2021-11-01 19:02:39 +01:00
if not node . wakuDiscv5 . isNil :
discard await node . stopDiscv5 ( )
2021-07-22 11:46:54 +02:00
await node . switch . stop ( )
node . started = false
2021-07-14 19:58:46 +02:00
{. pop . } # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
2020-10-21 11:54:29 +02:00
when isMainModule :
2021-07-22 11:46:54 +02:00
## Node setup happens in 6 phases:
## 1. Set up storage
## 2. Initialize node
## 3. Mount and initialize configured protocols
## 4. Start node and mounted protocols
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
2020-10-21 11:54:29 +02:00
import
2021-07-22 11:46:54 +02:00
confutils ,
2021-04-15 10:18:14 +02:00
system / ansi_c ,
2022-02-16 17:12:09 +01:00
libp2p / nameresolving / dnsresolver ,
2021-07-22 11:46:54 +02:00
.. / .. / common / utils / nat ,
. / config ,
. / waku_setup ,
2021-03-25 10:37:11 +02:00
. / storage / message / waku_message_store ,
2021-07-22 11:46:54 +02:00
. / storage / peer / waku_peer_storage
logScope :
topics = " wakunode.setup "
###################
# Setup functions #
###################
# 1/6 Setup storage
proc setupStorage ( conf : WakuNodeConf ) :
SetupResult [ tuple [ pStorage : WakuPeerStorage , mStorage : WakuMessageStore ] ] =
## Setup a SQLite Database for a wakunode based on a supplied
## configuration file and perform all necessary migration.
##
## If config allows, return peer storage and message store
## for use elsewhere.
var
sqliteDatabase : SqliteDatabase
storeTuple : tuple [ pStorage : WakuPeerStorage , mStorage : WakuMessageStore ]
# Setup DB
if conf . dbPath ! = " " :
let dbRes = SqliteDatabase . init ( conf . dbPath )
if dbRes . isErr :
warn " failed to init database " , err = dbRes . error
waku_node_errors . inc ( labelValues = [ " init_db_failure " ] )
return err ( " failed to init database " )
else :
sqliteDatabase = dbRes . value
if not sqliteDatabase . isNil :
# Database initialized. Let's set it up
sqliteDatabase . runMigrations ( conf ) # First migrate what we have
if conf . persistPeers :
# Peer persistence enable. Set up Peer table in storage
let res = WakuPeerStorage . new ( sqliteDatabase )
if res . isErr :
warn " failed to init new WakuPeerStorage " , err = res . error
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
else :
storeTuple . pStorage = res . value
if conf . persistMessages :
# Historical message persistence enable. Set up Message table in storage
let res = WakuMessageStore . init ( sqliteDatabase )
if res . isErr :
warn " failed to init WakuMessageStore " , err = res . error
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
else :
storeTuple . mStorage = res . value
ok ( storeTuple )
# 2/6 Initialize node
proc initNode ( conf : WakuNodeConf ,
pStorage : WakuPeerStorage = nil ) : SetupResult [ WakuNode ] =
## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
2020-09-11 06:16:45 +02:00
2022-02-16 17:12:09 +01:00
var dnsResolver : DnsResolver
if conf . dnsAddrs :
# Support for DNS multiaddrs
var nameServers : seq [ TransportAddress ]
for ip in conf . dnsAddrsNameServers :
nameServers . add ( initTAddress ( ip , Port ( 53 ) ) ) # Assume all servers use port 53
dnsResolver = DnsResolver . new ( nameServers )
2021-11-01 19:02:39 +01:00
let
## `udpPort` is only supplied to satisfy underlying APIs but is not
## actually a supported transport for libp2p traffic.
udpPort = conf . tcpPort
2021-07-22 11:46:54 +02:00
( extIp , extTcpPort , extUdpPort ) = setupNat ( conf . nat ,
clientId ,
Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
2021-10-12 13:43:01 +02:00
Port ( uint16 ( udpPort ) + conf . portsShift ) )
2021-07-22 11:46:54 +02:00
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if extIp . isSome ( ) and extTcpPort . isNone ( ) :
some ( Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) )
else :
extTcpPort
2021-12-06 20:51:37 +01:00
2022-02-18 12:10:38 +01:00
dns4DomainName = if conf . dns4DomainName ! = " " : some ( conf . dns4DomainName )
else : none ( string )
2021-12-06 20:51:37 +01:00
wakuFlags = initWakuFlags ( conf . lightpush ,
conf . filter ,
conf . store ,
conf . relay )
2021-11-01 19:02:39 +01:00
2021-11-10 12:05:36 +00:00
node = WakuNode . new ( conf . nodekey ,
2022-02-16 17:12:09 +01:00
conf . listenAddress , Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
extIp , extPort ,
pStorage ,
conf . maxConnections . int ,
Port ( uint16 ( conf . websocketPort ) + conf . portsShift ) ,
conf . websocketSupport ,
conf . websocketSecureSupport ,
conf . websocketSecureKeyPath ,
conf . websocketSecureCertPath ,
some ( wakuFlags ) ,
2022-02-18 12:10:38 +01:00
dnsResolver ,
dns4DomainName
2022-02-16 17:12:09 +01:00
)
2021-11-01 19:02:39 +01:00
if conf . discv5Discovery :
let discv5UdpPort = Port ( uint16 ( conf . discv5UdpPort ) + conf . portsShift )
node . wakuDiscv5 = WakuDiscoveryV5 . new (
extIP , extTcpPort , some ( discv5UdpPort ) ,
conf . listenAddress ,
discv5UdpPort ,
conf . discv5BootstrapNodes ,
conf . discv5EnrAutoUpdate ,
keys . PrivateKey ( conf . nodekey . skkey ) ,
2021-12-06 20:51:37 +01:00
wakuFlags ,
2021-11-01 19:02:39 +01:00
[ ] , # Empty enr fields, for now
node . rng
)
2021-07-22 11:46:54 +02:00
ok ( node )
# 3/6 Mount and initialize configured protocols
proc setupProtocols ( node : var WakuNode ,
2021-11-01 19:02:39 +01:00
conf : WakuNodeConf ,
mStorage : WakuMessageStore = nil ) : SetupResult [ bool ] =
2020-12-24 10:02:30 +02:00
2021-07-22 11:46:54 +02:00
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
2020-12-24 10:02:30 +02:00
2021-07-22 11:46:54 +02:00
# Mount relay on all nodes
mountRelay ( node ,
conf . topics . split ( " " ) ,
2021-09-17 10:31:25 -07:00
relayMessages = conf . relay , # Indicates if node is capable to relay messages
2021-09-24 10:32:45 -07:00
)
2020-12-24 10:02:30 +02:00
2021-07-22 11:46:54 +02:00
# Keepalive mounted on all nodes
mountLibp2pPing ( node )
2020-12-24 10:02:30 +02:00
2021-09-24 10:32:45 -07:00
when defined ( rln ) :
if conf . rlnRelay :
info " WakuRLNRelay is enabled "
# set up rln relay inputs
let ( groupOpt , memKeyPairOpt , memIndexOpt ) = rlnRelaySetUp ( conf . rlnRelayMemIndex )
if memIndexOpt . isNone :
error " failed to mount WakuRLNRelay "
else :
# mount rlnrelay in offline mode (for now)
2022-02-04 15:58:27 -08:00
waitFor node . mountRlnRelay ( groupOpt = groupOpt , memKeyPairOpt = memKeyPairOpt , memIndexOpt = memIndexOpt , onchainMode = false , pubsubTopic = conf . rlnRelayPubsubTopic , contentTopic = conf . rlnRelayContentTopic )
2021-09-24 10:32:45 -07:00
info " membership id key " , idkey = memKeyPairOpt . get ( ) . idKey . toHex
info " membership id commitment key " , idCommitmentkey = memKeyPairOpt . get ( ) . idCommitment . toHex
# check the correct construction of the tree by comparing the calculated root against the expected root
# no error should happen as it is already captured in the unit tests
# TODO have added this check to account for unseen corner cases, will remove it later
let
root = node . wakuRlnRelay . rlnInstance . getMerkleRoot . value . toHex ( )
expectedRoot = STATIC_GROUP_MERKLE_ROOT
if root ! = expectedRoot :
error " root mismatch: something went wrong not in Merkle tree construction "
debug " the calculated root " , root
2022-02-04 15:58:27 -08:00
info " WakuRLNRelay is mounted successfully " , pubsubtopic = conf . rlnRelayPubsubTopic , contentTopic = conf . rlnRelayContentTopic
2021-09-24 10:32:45 -07:00
2021-07-22 11:46:54 +02:00
if conf . swap :
mountSwap ( node )
# TODO Set swap peer, for now should be same as store peer
2020-09-11 06:16:45 +02:00
2021-07-22 11:46:54 +02:00
# Store setup
if ( conf . storenode ! = " " ) or ( conf . store ) :
2021-11-03 11:59:51 +01:00
mountStore ( node , mStorage , conf . persistMessages , conf . storeCapacity )
2021-03-26 10:49:51 +02:00
2021-07-22 11:46:54 +02:00
if conf . storenode ! = " " :
setStorePeer ( node , conf . storenode )
2021-03-26 10:49:51 +02:00
2021-07-22 11:46:54 +02:00
# NOTE Must be mounted after relay
if ( conf . lightpushnode ! = " " ) or ( conf . lightpush ) :
mountLightPush ( node )
if conf . lightpushnode ! = " " :
setLightPushPeer ( node , conf . lightpushnode )
# Filter setup. NOTE Must be mounted after relay
if ( conf . filternode ! = " " ) or ( conf . filter ) :
2021-12-08 18:35:47 +00:00
mountFilter ( node , filterTimeout = chronos . seconds ( conf . filterTimeout ) )
2021-06-25 14:06:56 -07:00
2021-07-22 11:46:54 +02:00
if conf . filternode ! = " " :
setFilterPeer ( node , conf . filternode )
ok ( true ) # Success
2021-03-26 10:49:51 +02:00
2021-07-22 11:46:54 +02:00
# 4/6 Start node and mounted protocols
proc startNode ( node : WakuNode , conf : WakuNodeConf ) : SetupResult [ bool ] =
## Start a configured node and all mounted protocols.
## Resume history, connect to static nodes and start
## keep-alive, if configured.
2021-03-26 10:49:51 +02:00
2021-11-15 13:29:18 +00:00
# Start Waku v2 node
2021-07-22 11:46:54 +02:00
waitFor node . start ( )
2021-11-15 13:29:18 +00:00
2021-07-22 11:46:54 +02:00
# Resume historical messages, this has to be called after the node has been started
if conf . store and conf . persistMessages :
waitFor node . resume ( )
# Connect to configured static nodes
if conf . staticnodes . len > 0 :
2022-01-26 12:02:57 +01:00
waitFor connectToNodes ( node , conf . staticnodes , " static " )
2021-08-12 10:51:38 +02:00
# Connect to discovered nodes
if conf . dnsDiscovery and conf . dnsDiscoveryUrl ! = " " :
2021-08-25 13:57:35 +02:00
debug " Discovering nodes using Waku DNS discovery " , url = conf . dnsDiscoveryUrl
var nameServers : seq [ TransportAddress ]
for ip in conf . dnsDiscoveryNameServers :
nameServers . add ( initTAddress ( ip , Port ( 53 ) ) ) # Assume all servers use port 53
let dnsResolver = DnsResolver . new ( nameServers )
proc resolver ( domain : string ) : Future [ string ] {. async , gcsafe . } =
trace " resolving " , domain = domain
let resolved = await dnsResolver . resolveTxt ( domain )
return resolved [ 0 ] # Use only first answer
2021-08-12 10:51:38 +02:00
2021-08-17 10:30:17 +02:00
var wakuDnsDiscovery = WakuDnsDiscovery . init ( conf . dnsDiscoveryUrl ,
2021-08-25 13:57:35 +02:00
resolver )
2021-08-12 10:51:38 +02:00
if wakuDnsDiscovery . isOk :
let discoveredPeers = wakuDnsDiscovery . get ( ) . findPeers ( )
if discoveredPeers . isOk :
info " Connecting to discovered peers "
2022-01-26 12:02:57 +01:00
waitFor connectToNodes ( node , discoveredPeers . get ( ) , " dnsdisc " )
2021-08-12 10:51:38 +02:00
else :
warn " Failed to init Waku DNS discovery "
2020-11-16 09:38:52 +01:00
2021-07-22 11:46:54 +02:00
# Start keepalive, if enabled
if conf . keepAlive :
node . startKeepalive ( )
ok ( true ) # Success
2020-11-24 12:53:42 +08:00
2021-07-22 11:46:54 +02:00
# 5/6 Start monitoring tools and external interfaces
proc startExternal ( node : WakuNode , conf : WakuNodeConf ) : SetupResult [ bool ] =
## Start configured external interfaces and monitoring tools
## on a Waku v2 node, including the RPC API and metrics
## monitoring ports.
2021-05-13 14:21:46 -07:00
2021-07-22 11:46:54 +02:00
if conf . rpc :
startRpc ( node , conf . rpcAddress , Port ( conf . rpcPort + conf . portsShift ) , conf )
2021-05-13 14:21:46 -07:00
2021-07-22 11:46:54 +02:00
if conf . metricsLogging :
startMetricsLog ( )
if conf . metricsServer :
startMetricsServer ( conf . metricsServerAddress ,
Port ( conf . metricsServerPort + conf . portsShift ) )
ok ( true ) # Success
2021-05-28 11:41:29 -07:00
2021-07-22 11:46:54 +02:00
let
conf = WakuNodeConf . load ( )
2021-06-02 09:53:34 +02:00
2021-07-22 11:46:54 +02:00
var
node : WakuNode # This is the node we're going to setup using the conf
2021-05-04 14:11:41 +02:00
2021-07-22 11:46:54 +02:00
##############
# Node setup #
##############
2020-10-09 15:58:50 +02:00
2021-07-22 11:46:54 +02:00
debug " 1/6 Setting up storage "
2021-06-04 12:02:47 +02:00
2021-07-22 11:46:54 +02:00
var
pStorage : WakuPeerStorage
mStorage : WakuMessageStore
2021-04-29 06:54:31 +02:00
2021-07-22 11:46:54 +02:00
let setupStorageRes = setupStorage ( conf )
if setupStorageRes . isErr :
error " 1/6 Setting up storage failed. Continuing without storage. "
else :
( pStorage , mStorage ) = setupStorageRes . get ( )
debug " 2/6 Initializing node "
let initNodeRes = initNode ( conf , pStorage )
2021-04-29 06:54:31 +02:00
2021-07-22 11:46:54 +02:00
if initNodeRes . isErr :
error " 2/6 Initializing node failed. Quitting. "
quit ( QuitFailure )
else :
node = initNodeRes . get ( )
debug " 3/6 Mounting protocols "
2021-04-24 12:56:37 +08:00
2021-07-22 11:46:54 +02:00
let setupProtocolsRes = setupProtocols ( node , conf , mStorage )
2020-09-01 04:09:54 +02:00
2021-07-22 11:46:54 +02:00
if setupProtocolsRes . isErr :
error " 3/6 Mounting protocols failed. Continuing in current state. "
2020-09-01 04:09:54 +02:00
2021-07-22 11:46:54 +02:00
debug " 4/6 Starting node and mounted protocols "
2021-04-15 10:18:14 +02:00
2021-07-22 11:46:54 +02:00
let startNodeRes = startNode ( node , conf )
if startNodeRes . isErr :
error " 4/6 Starting node and mounted protocols failed. Continuing in current state. "
debug " 5/6 Starting monitoring and external interfaces "
let startExternalRes = startExternal ( node , conf )
if startExternalRes . isErr :
error " 5/6 Starting monitoring and external interfaces failed. Continuing in current state. "
debug " 6/6 Setting up shutdown hooks "
# 6/6 Setup graceful shutdown hooks
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
2021-04-15 10:18:14 +02:00
# Handle Ctrl-C SIGINT
proc handleCtrlC ( ) {. noconv . } =
when defined ( windows ) :
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc ( )
info " Shutting down after receiving SIGINT "
waitFor node . stop ( )
quit ( QuitSuccess )
setControlCHook ( handleCtrlC )
# Handle SIGTERM
when defined ( posix ) :
proc handleSigterm ( signal : cint ) {. noconv . } =
info " Shutting down after receiving SIGTERM "
waitFor node . stop ( )
quit ( QuitSuccess )
c_signal ( SIGTERM , handleSigterm )
2021-06-02 09:53:34 +02:00
2021-07-22 11:46:54 +02:00
debug " Node setup complete "
2020-09-01 04:09:54 +02:00
2021-05-28 11:41:29 -07:00
runForever ( )