2020-04-29 04:49:27 +00:00
import
2020-11-10 07:13:16 +00:00
std / [ options , tables , strutils , sequtils ] ,
2021-06-09 14:37:08 +00:00
chronos , chronicles , metrics ,
metrics / chronos_httpserver ,
stew / shims / net as stewNet ,
2020-08-31 03:32:41 +00:00
# TODO: Why do we need eth keys?
2020-09-01 02:09:54 +00:00
eth / keys ,
2021-02-22 17:40:02 +00:00
web3 ,
2020-05-15 04:11:14 +00:00
libp2p / multiaddress ,
libp2p / crypto / crypto ,
libp2p / protocols / protocol ,
2020-07-28 08:17:50 +00:00
# NOTE For TopicHandler, solve with exports?
2021-03-16 18:18:40 +00:00
libp2p / protocols / pubsub / rpc / messages ,
2020-07-28 08:17:50 +00:00
libp2p / protocols / pubsub / pubsub ,
2021-04-21 09:36:56 +00:00
libp2p / protocols / pubsub / gossipsub ,
2021-06-09 14:37:08 +00:00
libp2p / builders ,
2021-03-16 18:18:40 +00:00
.. / protocol / [ waku_relay , waku_message , message_notifier ] ,
2020-11-24 04:34:32 +00:00
.. / protocol / waku_store / waku_store ,
2020-11-23 02:27:45 +00:00
.. / protocol / waku_swap / waku_swap ,
2021-01-05 04:52:10 +00:00
.. / protocol / waku_filter / waku_filter ,
2021-04-24 04:56:37 +00:00
.. / protocol / waku_lightpush / waku_lightpush ,
2021-06-08 18:56:32 +00:00
.. / protocol / waku_rln_relay / waku_rln_relay_types ,
2021-06-02 07:53:34 +00:00
.. / protocol / waku_keepalive / waku_keepalive ,
2021-01-25 11:03:52 +00:00
.. / utils / peers ,
2021-03-25 08:37:11 +00:00
. / storage / message / message_store ,
2021-03-26 08:49:51 +00:00
. / storage / peer / peer_storage ,
2021-02-04 10:32:58 +00:00
.. / utils / requests ,
2021-03-26 08:49:51 +00:00
. / peer_manager / peer_manager
2020-10-21 09:54:29 +00:00
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
import .. / protocol / waku_rln_relay / [ rln , waku_rln_relay_utils ]
2021-01-29 08:42:41 +00:00
declarePublicCounter waku_node_messages , " number of messages received " , [ " type " ]
declarePublicGauge waku_node_filters , " number of content filter subscriptions "
declarePublicGauge waku_node_errors , " number of wakunode errors " , [ " type " ]
2020-09-16 04:23:10 +00:00
logScope :
topics = " wakunode "
2020-09-11 04:16:45 +00:00
# Default clientId
const clientId * = " Nimbus Waku v2 node "
2020-04-29 04:49:27 +00:00
2021-03-11 07:48:59 +00:00
# Default topic
const defaultTopic = " /waku/2/default-waku/proto "
2020-04-29 04:49:27 +00:00
# key and crypto modules different
type
KeyPair * = crypto . KeyPair
PublicKey * = crypto . PublicKey
PrivateKey * = crypto . PrivateKey
2021-01-06 09:35:05 +00:00
# XXX: Weird type, should probably be using pubsub Topic object name?
Topic * = string
2020-07-29 13:24:01 +00:00
Message * = seq [ byte ]
2021-01-06 09:35:05 +00:00
WakuInfo * = object
# NOTE One for simplicity, can extend later as needed
listenStr * : string
#multiaddrStrings*: seq[string]
2020-11-24 04:34:32 +00:00
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode * = ref object of RootObj
2021-02-04 10:32:58 +00:00
peerManager * : PeerManager
2020-11-24 04:34:32 +00:00
switch * : Switch
wakuRelay * : WakuRelay
wakuStore * : WakuStore
wakuFilter * : WakuFilter
wakuSwap * : WakuSwap
2021-02-22 17:40:02 +00:00
wakuRlnRelay * : WakuRLNRelay
2021-04-24 04:56:37 +00:00
wakuLightPush * : WakuLightPush
2021-06-02 07:53:34 +00:00
wakuKeepalive * : WakuKeepalive
2020-11-24 04:34:32 +00:00
peerInfo * : PeerInfo
libp2pTransportLoops * : seq [ Future [ void ] ]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages * : seq [ ( Topic , WakuMessage ) ]
filters * : Filters
subscriptions * : MessageNotificationSubscriptions
rng * : ref BrHmacDrbgContext
2021-04-01 09:37:14 +00:00
started * : bool # Indicates that node has started listening
2020-11-24 04:34:32 +00:00
2020-07-24 01:39:58 +00:00
# NOTE Any difference here in Waku vs Eth2?
# E.g. Devp2p/Libp2p support, etc.
#func asLibp2pKey*(key: keys.PublicKey): PublicKey =
# PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
func asEthKey * ( key : PrivateKey ) : keys . PrivateKey =
keys . PrivateKey ( key . skkey )
2020-11-10 07:13:16 +00:00
proc removeContentFilters ( filters : var Filters , contentFilters : seq [ ContentFilter ] ) {. gcsafe . } =
# Flatten all unsubscribe topics into single seq
2021-05-05 08:34:40 +00:00
let unsubscribeTopics = contentFilters . mapIt ( it . contentTopic )
2020-11-10 07:13:16 +00:00
debug " unsubscribing " , unsubscribeTopics = unsubscribeTopics
var rIdToRemove : seq [ string ] = @ [ ]
for rId , f in filters . mpairs :
# Iterate filter entries to remove matching content topics
2021-05-05 08:34:40 +00:00
2020-11-10 07:13:16 +00:00
# make sure we delete the content filter
# if no more topics are left
2021-05-05 08:34:40 +00:00
f . contentFilters . keepIf ( proc ( cf : auto ) : bool = cf . contentTopic notin unsubscribeTopics )
2020-11-10 07:13:16 +00:00
if f . contentFilters . len = = 0 :
rIdToRemove . add ( rId )
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove :
filters . del ( rId )
debug " filters modified " , filters = filters
2020-07-24 01:39:58 +00:00
template tcpEndPoint ( address , port ) : auto =
MultiAddress . init ( address , tcpProtocol , port )
2020-09-01 02:09:54 +00:00
## Public API
##
proc init * ( T : type WakuNode , nodeKey : crypto . PrivateKey ,
bindIp : ValidIpAddress , bindPort : Port ,
2021-03-26 08:49:51 +00:00
extIp = none [ ValidIpAddress ] ( ) , extPort = none [ Port ] ( ) ,
peerStorage : PeerStorage = nil ) : T =
2020-10-06 03:33:28 +00:00
## Creates a Waku Node.
##
## Status: Implemented.
##
2020-04-29 04:49:27 +00:00
let
2020-10-14 03:32:37 +00:00
rng = crypto . newRng ( )
2020-09-01 02:09:54 +00:00
hostAddress = tcpEndPoint ( bindIp , bindPort )
announcedAddresses = if extIp . isNone ( ) or extPort . isNone ( ) : @ [ ]
else : @ [ tcpEndPoint ( extIp . get ( ) , extPort . get ( ) ) ]
2020-05-21 04:16:58 +00:00
peerInfo = PeerInfo . init ( nodekey )
2020-09-01 02:09:54 +00:00
info " Initializing networking " , hostAddress ,
announcedAddresses
2020-07-24 01:39:58 +00:00
# XXX: Add this when we create node or start it?
2021-01-15 07:37:20 +00:00
peerInfo . addrs . add ( hostAddress ) # Index 0
for multiaddr in announcedAddresses :
peerInfo . addrs . add ( multiaddr ) # Announced addresses in index > 0
2021-03-26 08:49:51 +00:00
2020-10-14 03:32:37 +00:00
var switch = newStandardSwitch ( some ( nodekey ) , hostAddress ,
transportFlags = { ServerFlags . ReuseAddr } , rng = rng )
2020-09-16 04:23:10 +00:00
# TODO Untested - verify behavior after switch interface change
# More like this:
# let pubsub = GossipSub.init(
# switch = switch,
# msgIdProvider = msgIdProvider,
# triggerSelf = true, sign = false,
# verifySignature = false).PubSub
2020-09-17 20:10:41 +00:00
result = WakuNode (
2021-03-26 08:49:51 +00:00
peerManager : PeerManager . new ( switch , peerStorage ) ,
2020-09-28 21:44:14 +00:00
switch : switch ,
2020-10-14 03:32:37 +00:00
rng : rng ,
2020-09-17 20:10:41 +00:00
peerInfo : peerInfo ,
2020-10-02 12:48:56 +00:00
subscriptions : newTable [ string , MessageNotificationSubscription ] ( ) ,
filters : initTable [ string , Filter ] ( )
2020-09-17 20:10:41 +00:00
)
2020-09-11 11:28:27 +00:00
2020-09-01 02:09:54 +00:00
proc start * ( node : WakuNode ) {. async . } =
2020-10-06 03:33:28 +00:00
## Starts a created Waku Node.
##
## Status: Implemented.
##
2020-07-24 01:39:58 +00:00
node . libp2pTransportLoops = await node . switch . start ( )
2020-10-22 11:12:00 +00:00
2020-07-24 01:39:58 +00:00
# TODO Get this from WakuNode obj
let peerInfo = node . peerInfo
2020-09-16 04:23:10 +00:00
info " PeerInfo " , peerId = peerInfo . peerId , addrs = peerInfo . addrs
2021-01-15 07:37:20 +00:00
let listenStr = $ peerInfo . addrs [ ^ 1 ] & " /p2p/ " & $ peerInfo . peerId
2020-07-24 01:39:58 +00:00
## XXX: this should be /ip4..., / stripped?
2020-04-29 04:49:27 +00:00
info " Listening on " , full = listenStr
2021-04-01 09:37:14 +00:00
if not node . wakuRelay . isNil :
await node . wakuRelay . start ( )
node . started = true
2020-09-02 03:15:25 +00:00
proc stop * ( node : WakuNode ) {. async . } =
2020-10-20 02:36:27 +00:00
if not node . wakuRelay . isNil :
await node . wakuRelay . stop ( )
2020-07-28 08:17:50 +00:00
2020-09-02 03:15:25 +00:00
await node . switch . stop ( )
2020-07-27 09:01:06 +00:00
2021-04-01 09:37:14 +00:00
node . started = false
2021-04-13 05:56:49 +00:00
proc subscribe ( node : WakuNode , topic : Topic , handler : Option [ TopicHandler ] ) =
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2021-04-13 05:56:49 +00:00
info " subscribe " , topic = topic
proc defaultHandler ( topic : string , data : seq [ byte ] ) {. async , gcsafe . } =
# A default handler should be registered for all topics
trace " Hit default handler " , topic = topic , data = data
let msg = WakuMessage . init ( data )
if msg . isOk ( ) :
await node . subscriptions . notify ( topic , msg . value ( ) ) # Trigger subscription handlers on a store/filter node
waku_node_messages . inc ( labelValues = [ " relay " ] )
let wakuRelay = node . wakuRelay
if topic notin PubSub ( wakuRelay ) . topics :
# Add default handler only for new topics
debug " Registering default handler " , topic = topic
wakuRelay . subscribe ( topic , defaultHandler )
if handler . isSome :
debug " Registering handler " , topic = topic
wakuRelay . subscribe ( topic , handler . get ( ) )
2021-02-02 11:33:59 +00:00
proc subscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-07-27 09:01:06 +00:00
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
2020-08-27 10:15:46 +00:00
## this topic. TopicHandler is a method that takes a topic and some data.
2020-07-27 09:01:06 +00:00
##
2020-08-27 10:15:46 +00:00
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
2021-04-13 05:56:49 +00:00
node . subscribe ( topic , some ( handler ) )
2020-07-27 09:01:06 +00:00
2020-10-02 12:48:56 +00:00
proc subscribe * ( node : WakuNode , request : FilterRequest , handler : ContentFilterHandler ) {. async , gcsafe . } =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
2020-11-10 07:13:16 +00:00
# Sanity check for well-formed subscribe FilterRequest
doAssert ( request . subscribe , " invalid subscribe request " )
2020-10-02 12:48:56 +00:00
info " subscribe content " , filter = request
2020-07-27 09:01:06 +00:00
2020-10-20 02:36:27 +00:00
var id = generateRequestId ( node . rng )
2021-02-08 09:17:20 +00:00
2020-10-20 02:36:27 +00:00
if node . wakuFilter . isNil = = false :
2021-02-08 09:17:20 +00:00
let idOpt = await node . wakuFilter . subscribe ( request )
2020-07-27 09:01:06 +00:00
2021-02-08 09:17:20 +00:00
if idOpt . isSome ( ) :
# Subscribed successfully.
id = idOpt . get ( )
else :
# Failed to subscribe
error " remote subscription to filter failed " , filter = request
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
# Register handler for filter, whether remote subscription succeeded or not
node . filters [ id ] = Filter ( contentFilters : request . contentFilters , handler : handler )
2021-01-29 08:42:41 +00:00
waku_node_filters . set ( node . filters . len . int64 )
2021-02-02 11:33:59 +00:00
proc unsubscribe * ( node : WakuNode , topic : Topic , handler : TopicHandler ) =
2020-10-27 01:13:56 +00:00
## Unsubscribes a handler from a PubSub topic.
2020-07-27 09:01:06 +00:00
##
2020-10-27 01:13:56 +00:00
## Status: Implemented.
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribe`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 01:13:56 +00:00
info " unsubscribe " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 11:33:59 +00:00
wakuRelay . unsubscribe ( @ [ ( topic , handler ) ] )
2020-10-27 01:13:56 +00:00
2021-02-02 11:33:59 +00:00
proc unsubscribeAll * ( node : WakuNode , topic : Topic ) =
2020-10-27 01:13:56 +00:00
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `unsubscribeAll`. WakuRelay not mounted. "
# @TODO improved error handling
return
2020-10-27 01:13:56 +00:00
info " unsubscribeAll " , topic = topic
let wakuRelay = node . wakuRelay
2021-02-02 11:33:59 +00:00
wakuRelay . unsubscribeAll ( topic )
2020-10-27 01:13:56 +00:00
2020-07-27 09:01:06 +00:00
2020-11-10 07:13:16 +00:00
proc unsubscribe * ( node : WakuNode , request : FilterRequest ) {. async , gcsafe . } =
2020-07-27 09:01:06 +00:00
## Unsubscribe from a content filter.
##
2020-11-10 07:13:16 +00:00
## Status: Implemented.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert ( request . subscribe = = false , " invalid unsubscribe request " )
info " unsubscribe content " , filter = request
await node . wakuFilter . unsubscribe ( request )
node . filters . removeContentFilters ( request . contentFilters )
2020-09-01 15:20:38 +00:00
2021-01-29 08:42:41 +00:00
waku_node_filters . set ( node . filters . len . int64 )
2020-11-16 09:55:49 +00:00
2021-03-16 18:18:40 +00:00
proc publish * ( node : WakuNode , topic : Topic , message : WakuMessage , rlnRelayEnabled : bool = false ) {. async , gcsafe . } =
2020-09-01 15:20:38 +00:00
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
2020-07-27 09:01:06 +00:00
##
2020-09-01 15:20:38 +00:00
## Status: Implemented.
2021-03-16 18:18:40 +00:00
## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature)
2021-04-29 04:54:31 +00:00
if node . wakuRelay . isNil :
error " Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead. "
# @TODO improved error handling
return
2020-07-27 09:01:06 +00:00
2020-09-16 04:23:10 +00:00
let wakuRelay = node . wakuRelay
2020-09-02 03:15:25 +00:00
debug " publish " , topic = topic , contentTopic = message . contentTopic
2021-03-16 18:18:40 +00:00
var publishingMessage = message
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
if rlnRelayEnabled :
# if rln relay is enabled then a proof must be generated and added to the waku message
let
proof = proofGen ( message . payload )
## TODO here since the message is immutable we have to make a copy of it and then attach the proof to its duplicate
## TODO however, it might be better to change message type to mutable (i.e., var) so that we can add the proof field to the original message
publishingMessage = WakuMessage ( payload : message . payload , contentTopic : message . contentTopic , version : message . version , proof : proof )
2021-03-16 18:18:40 +00:00
2020-09-02 03:15:25 +00:00
let data = message . encode ( ) . buffer
2020-12-02 08:40:53 +00:00
discard await wakuRelay . publish ( topic , data )
2020-07-27 09:01:06 +00:00
2021-04-24 04:56:37 +00:00
proc lightpush * ( node : WakuNode , topic : Topic , message : WakuMessage , handler : PushResponseHandler ) {. async , gcsafe . } =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality. This field may be also be omitted.
##
## Status: Implemented.
debug " Publishing with lightpush " , topic = topic , contentTopic = message . contentTopic
let rpc = PushRequest ( pubSubTopic : topic , message : message )
await node . wakuLightPush . request ( rpc , handler )
2020-10-06 03:33:28 +00:00
proc query * ( node : WakuNode , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
2020-09-24 02:16:25 +00:00
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse.
2020-07-27 09:01:06 +00:00
##
2020-09-24 02:16:25 +00:00
## Status: Implemented.
2020-10-06 03:33:28 +00:00
2020-11-24 04:53:42 +00:00
# TODO Once waku swap is less experimental, this can simplified
2020-11-21 05:31:48 +00:00
if node . wakuSwap . isNil :
debug " Using default query "
await node . wakuStore . query ( query , handler )
else :
2020-11-16 09:55:49 +00:00
debug " Using SWAPAccounting query "
2020-11-24 04:53:42 +00:00
# TODO wakuSwap now part of wakuStore object
await node . wakuStore . queryWithAccounting ( query , handler )
2020-11-16 09:55:49 +00:00
2021-05-26 19:33:22 +00:00
proc resume * ( node : WakuNode , peerList : Option [ seq [ PeerInfo ] ] = none ( seq [ PeerInfo ] ) ) {. async , gcsafe . } =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if not node . wakuStore . isNil :
let retrievedMessages = await node . wakuStore . resume ( peerList )
if retrievedMessages . isOk :
info " the number of retrieved messages since the last online time: " , number = retrievedMessages . value
2020-10-06 03:33:28 +00:00
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info * ( node : WakuNode ) : WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
##
## Status: Implemented.
##
# TODO Generalize this for other type of multiaddresses
let peerInfo = node . peerInfo
2021-01-15 07:37:20 +00:00
let listenStr = $ peerInfo . addrs [ ^ 1 ] & " /p2p/ " & $ peerInfo . peerId
2020-10-06 03:33:28 +00:00
let wakuInfo = WakuInfo ( listenStr : listenStr )
return wakuInfo
2020-07-27 09:01:06 +00:00
2020-10-20 02:36:27 +00:00
proc mountFilter * ( node : WakuNode ) =
info " mounting filter "
proc filterHandler ( requestId : string , msg : MessagePush ) {. gcsafe . } =
info " push received "
for message in msg . messages :
2021-04-22 13:45:13 +00:00
node . filters . notify ( message , requestId ) # Trigger filter handlers on a light node
2021-01-29 08:42:41 +00:00
waku_node_messages . inc ( labelValues = [ " filter " ] )
2020-10-20 02:36:27 +00:00
2021-02-08 09:17:20 +00:00
node . wakuFilter = WakuFilter . init ( node . peerManager , node . rng , filterHandler )
2020-10-20 02:36:27 +00:00
node . switch . mount ( node . wakuFilter )
node . subscriptions . subscribe ( WakuFilterCodec , node . wakuFilter . subscription ( ) )
2020-11-24 04:53:42 +00:00
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
2020-11-18 12:45:51 +00:00
proc mountSwap * ( node : WakuNode ) =
info " mounting swap "
2021-02-09 08:31:38 +00:00
node . wakuSwap = WakuSwap . init ( node . peerManager , node . rng )
2020-11-18 12:45:51 +00:00
node . switch . mount ( node . wakuSwap )
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
2021-05-06 13:43:43 +00:00
proc mountStore * ( node : WakuNode , store : MessageStore = nil , persistMessages : bool = false ) =
2020-11-24 04:53:42 +00:00
info " mounting store "
if node . wakuSwap . isNil :
debug " mounting store without swap "
2021-02-09 08:31:38 +00:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store )
2020-11-24 04:53:42 +00:00
else :
debug " mounting store with swap "
2021-02-09 08:31:38 +00:00
node . wakuStore = WakuStore . init ( node . peerManager , node . rng , store , node . wakuSwap )
2020-11-24 04:53:42 +00:00
node . switch . mount ( node . wakuStore )
2021-05-03 21:30:52 +00:00
if persistMessages :
node . subscriptions . subscribe ( WakuStoreCodec , node . wakuStore . subscription ( ) )
2020-11-24 04:53:42 +00:00
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
proc mountRlnRelay * ( node : WakuNode , ethClientAddress : Option [ string ] = none ( string ) , ethAccountAddress : Option [ Address ] = none ( Address ) , membershipContractAddress : Option [ Address ] = none ( Address ) ) {. async . } =
# TODO return a bool value to indicate the success of the call
# check whether inputs are provided
doAssert ( ethClientAddress . isSome ( ) )
doAssert ( ethAccountAddress . isSome ( ) )
doAssert ( membershipContractAddress . isSome ( ) )
# create an RLN instance
var
ctx = RLN [ Bn256 ] ( )
ctxPtr = addr ( ctx )
doAssert ( createRLNInstance ( 32 , ctxPtr ) )
# generate the membership keys
let membershipKeyPair = membershipKeyGen ( ctxPtr )
# check whether keys are generated
doAssert ( membershipKeyPair . isSome ( ) )
debug " the membership key for the rln relay is generated "
# initialize the WakuRLNRelay
var rlnPeer = WakuRLNRelay ( membershipKeyPair : membershipKeyPair . get ( ) ,
ethClientAddress : ethClientAddress . get ( ) ,
ethAccountAddress : ethAccountAddress . get ( ) ,
membershipContractAddress : membershipContractAddress . get ( ) )
# register the rln-relay peer to the membership contract
let is_successful = await rlnPeer . register ( )
# check whether registration is done
doAssert ( is_successful )
debug " peer is successfully registered into the membership contract "
node . wakuRlnRelay = rlnPeer
when defined ( rln ) :
proc addRLNRelayValidator * ( node : WakuNode , pubsubTopic : string ) =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets message validator on the given pubsubTopic, the validator will check that
## all the messages published in the pubsubTopic have a valid zero-knowledge proof
proc validator ( topic : string , message : messages . Message ) : Future [ ValidationResult ] {. async . } =
let msg = WakuMessage . init ( message . data )
if msg . isOk ( ) :
# check the proof
if proofVrfy ( msg . value ( ) . payload , msg . value ( ) . proof ) :
result = ValidationResult . Accept
# set a validator for the pubsubTopic
let pb = PubSub ( node . wakuRelay )
pb . addValidator ( pubsubTopic , validator )
2021-03-16 18:18:40 +00:00
2021-05-04 12:11:41 +00:00
proc mountRelay * ( node : WakuNode ,
topics : seq [ string ] = newSeq [ string ] ( ) ,
rlnRelayEnabled = false ,
2021-05-24 11:19:33 +00:00
relayMessages = true ,
triggerSelf = true ) {. gcsafe . } =
2020-10-20 02:36:27 +00:00
let wakuRelay = WakuRelay . init (
switch = node . switch ,
# Use default
#msgIdProvider = msgIdProvider,
2021-05-24 11:19:33 +00:00
triggerSelf = triggerSelf ,
2020-10-20 02:36:27 +00:00
sign = false ,
verifySignature = false
)
2021-05-04 12:11:41 +00:00
2021-06-02 07:53:34 +00:00
info " mounting relay " , rlnRelayEnabled = rlnRelayEnabled , relayMessages = relayMessages
2020-10-20 02:36:27 +00:00
node . switch . mount ( wakuRelay )
2021-05-04 12:11:41 +00:00
if not relayMessages :
## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes).
## All nodes, however, currently require WakuRelay, regardless of desired capabilities.
## This is to allow protocol stream negotation with relay-capable nodes to succeed.
## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub
## topics. We also never start the relay protocol. node.wakuRelay remains nil.
## @TODO: in future, this WakuRelay dependency will be removed completely
return
node . wakuRelay = wakuRelay
2020-10-20 02:36:27 +00:00
2021-04-21 09:36:56 +00:00
node . subscribe ( defaultTopic , none ( TopicHandler ) )
2021-04-16 09:57:45 +00:00
2021-04-21 09:36:56 +00:00
for topic in topics :
node . subscribe ( topic , none ( TopicHandler ) )
2021-04-16 09:57:45 +00:00
2021-04-21 09:36:56 +00:00
if node . peerManager . hasPeers ( WakuRelayCodec ) :
trace " Found previous WakuRelay peers. Reconnecting. "
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
waitFor node . peerManager . reconnectPeers ( WakuRelayCodec ,
wakuRelay . parameters . pruneBackoff + chronos . seconds ( BackoffSlackTime ) )
2021-06-08 18:56:32 +00:00
when defined ( rln ) :
if rlnRelayEnabled :
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
info " WakuRLNRelay is enabled "
waitFor mountRlnRelay ( node )
# TODO currently the message validator is set for the defaultTopic, this can be configurable to accept other pubsub topics as well
addRLNRelayValidator ( node , defaultTopic )
info " WakuRLNRelay is mounted successfully "
2021-04-01 09:37:14 +00:00
if node . started :
# Node has already started. Start the WakuRelay protocol
waitFor node . wakuRelay . start ( )
info " relay mounted and started successfully "
2021-02-22 17:40:02 +00:00
2021-04-24 04:56:37 +00:00
proc mountLightPush * ( node : WakuNode ) =
info " mounting light push "
if node . wakuRelay . isNil :
debug " mounting lightpush without relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil )
else :
debug " mounting lightpush with relay "
node . wakuLightPush = WakuLightPush . init ( node . peerManager , node . rng , nil , node . wakuRelay )
node . switch . mount ( node . wakuLightPush )
2021-06-02 07:53:34 +00:00
proc mountKeepalive * ( node : WakuNode ) =
info " mounting keepalive "
node . wakuKeepalive = WakuKeepalive . new ( node . peerManager , node . rng )
node . switch . mount ( node . wakuKeepalive )
proc keepaliveLoop ( node : WakuNode , keepalive : chronos . Duration ) {. async . } =
while node . started :
# Keep all managed peers alive when idle
trace " Running keepalive "
await node . wakuKeepalive . keepAllAlive ( )
await sleepAsync ( keepalive )
proc startKeepalive * ( node : WakuNode ) =
let defaultKeepalive = 5 . minutes # 50% of the default chronosstream timeout duration
info " starting keepalive " , keepalive = defaultKeepalive
asyncSpawn node . keepaliveLoop ( defaultKeepalive )
2020-10-21 09:54:29 +00:00
## Helpers
proc dialPeer * ( n : WakuNode , address : string ) {. async . } =
info " dialPeer " , address = address
# XXX: This turns ipfs into p2p, not quite sure why
let remotePeer = parsePeerInfo ( address )
2020-09-25 11:35:32 +00:00
2021-01-25 11:03:52 +00:00
info " Dialing peer " , wireAddr = remotePeer . addrs [ 0 ] , peerId = remotePeer . peerId
2020-10-21 09:54:29 +00:00
# NOTE This is dialing on WakuRelay protocol specifically
2021-02-04 10:32:58 +00:00
discard await n . peerManager . dialPeer ( remotePeer , WakuRelayCodec )
info " Post peerManager dial "
2020-09-11 04:16:45 +00:00
2020-10-21 09:54:29 +00:00
proc setStorePeer * ( n : WakuNode , address : string ) =
2021-06-04 10:02:47 +00:00
info " Set store peer " , address = address
2020-09-11 04:16:45 +00:00
2020-10-21 09:54:29 +00:00
let remotePeer = parsePeerInfo ( address )
2020-09-25 11:35:32 +00:00
2020-10-21 09:54:29 +00:00
n . wakuStore . setPeer ( remotePeer )
2020-09-25 11:35:32 +00:00
2020-10-21 09:54:29 +00:00
proc setFilterPeer * ( n : WakuNode , address : string ) =
2021-06-04 10:02:47 +00:00
info " Set filter peer " , address = address
2020-09-25 11:35:32 +00:00
2020-10-21 09:54:29 +00:00
let remotePeer = parsePeerInfo ( address )
2020-10-09 13:58:50 +00:00
2020-10-21 09:54:29 +00:00
n . wakuFilter . setPeer ( remotePeer )
2020-10-09 13:58:50 +00:00
2021-06-04 10:02:47 +00:00
proc setLightPushPeer * ( n : WakuNode , address : string ) =
info " Set lightpush peer " , address = address
let remotePeer = parsePeerInfo ( address )
n . wakuLightPush . setPeer ( remotePeer )
2020-10-22 11:12:00 +00:00
proc connectToNodes * ( n : WakuNode , nodes : seq [ string ] ) {. async . } =
2020-10-21 09:54:29 +00:00
for nodeId in nodes :
info " connectToNodes " , node = nodeId
# XXX: This seems...brittle
2020-10-22 11:12:00 +00:00
await dialPeer ( n , nodeId )
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
proc connectToNodes * ( n : WakuNode , nodes : seq [ PeerInfo ] ) {. async . } =
for peerInfo in nodes :
info " connectToNodes " , peer = peerInfo
2021-02-04 10:32:58 +00:00
discard await n . peerManager . dialPeer ( peerInfo , WakuRelayCodec )
2020-10-22 11:12:00 +00:00
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
# fully connected.
#
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync ( 5 . seconds )
2020-10-09 13:58:50 +00:00
2020-10-21 09:54:29 +00:00
when isMainModule :
import
2021-04-15 08:18:14 +00:00
system / ansi_c ,
2020-10-21 09:54:29 +00:00
confutils , json_rpc / rpcserver , metrics ,
2020-12-24 08:02:30 +00:00
. / config ,
. / jsonrpc / [ admin_api ,
debug_api ,
filter_api ,
private_api ,
relay_api ,
store_api ] ,
2021-03-25 08:37:11 +00:00
. / storage / message / waku_message_store ,
2021-03-26 08:49:51 +00:00
. / storage / peer / waku_peer_storage ,
2020-11-17 09:34:53 +00:00
.. / .. / common / utils / nat
2020-09-11 04:16:45 +00:00
2020-12-24 08:02:30 +00:00
proc startRpc ( node : WakuNode , rpcIp : ValidIpAddress , rpcPort : Port , conf : WakuNodeConf ) =
2020-09-11 04:16:45 +00:00
let
ta = initTAddress ( rpcIp , rpcPort )
rpcServer = newRpcHttpServer ( [ ta ] )
2020-12-24 08:02:30 +00:00
installDebugApiHandlers ( node , rpcServer )
# Install enabled API handlers:
if conf . relay :
let topicCache = newTable [ string , seq [ WakuMessage ] ] ( )
installRelayApiHandlers ( node , rpcServer , topicCache )
if conf . rpcPrivate :
# Private API access allows WakuRelay functionality that
# is backwards compatible with Waku v1.
installPrivateApiHandlers ( node , rpcServer , node . rng , topicCache )
if conf . filter :
let messageCache = newTable [ ContentTopic , seq [ WakuMessage ] ] ( )
installFilterApiHandlers ( node , rpcServer , messageCache )
if conf . store :
installStoreApiHandlers ( node , rpcServer )
if conf . rpcAdmin :
installAdminApiHandlers ( node , rpcServer )
2020-09-11 04:16:45 +00:00
rpcServer . start ( )
info " RPC Server started " , ta
proc startMetricsServer ( serverIp : ValidIpAddress , serverPort : Port ) =
info " Starting metrics HTTP server " , serverIp , serverPort
2021-06-02 14:54:38 +00:00
2021-06-09 14:37:08 +00:00
startMetricsHttpServer ( $ serverIp , serverPort )
2020-09-11 04:16:45 +00:00
2021-06-02 14:54:38 +00:00
info " Metrics HTTP server started " , serverIp , serverPort
2020-09-11 04:16:45 +00:00
proc startMetricsLog ( ) =
2021-03-26 09:52:04 +00:00
# https://github.com/nim-lang/Nim/issues/17369
var logMetrics : proc ( udata : pointer ) {. gcsafe , raises : [ Defect ] . }
logMetrics = proc ( udata : pointer ) =
2020-09-11 04:16:45 +00:00
{. gcsafe . } :
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
# public in libp2p or do our own peer counting after all.
2021-01-29 08:42:41 +00:00
var
totalMessages = 0 . float64
for key in waku_node_messages . metrics . keys ( ) :
2021-03-26 09:52:04 +00:00
try :
totalMessages = totalMessages + waku_node_messages . value ( key )
except KeyError :
discard
2020-09-11 04:16:45 +00:00
info " Node metrics " , totalMessages
discard setTimer ( Moment . fromNow ( 2 . seconds ) , logMetrics )
discard setTimer ( Moment . fromNow ( 2 . seconds ) , logMetrics )
2021-03-26 08:49:51 +00:00
2020-09-01 02:09:54 +00:00
let
conf = WakuNodeConf . load ( )
2021-03-26 08:49:51 +00:00
# Storage setup
var sqliteDatabase : SqliteDatabase
2021-05-12 10:55:09 +00:00
if conf . dbPath ! = " " :
2021-05-11 15:07:26 +00:00
let dbRes = SqliteDatabase . init ( conf . dbPath )
2021-03-26 08:49:51 +00:00
if dbRes . isErr :
warn " failed to init database " , err = dbRes . error
waku_node_errors . inc ( labelValues = [ " init_db_failure " ] )
else :
sqliteDatabase = dbRes . value
var pStorage : WakuPeerStorage
2021-05-12 10:55:09 +00:00
if conf . persistPeers and not sqliteDatabase . isNil :
2021-03-26 08:49:51 +00:00
let res = WakuPeerStorage . new ( sqliteDatabase )
if res . isErr :
warn " failed to init new WakuPeerStorage " , err = res . error
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
else :
pStorage = res . value
let
2020-09-01 02:09:54 +00:00
( extIp , extTcpPort , extUdpPort ) = setupNat ( conf . nat , clientId ,
Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
Port ( uint16 ( conf . udpPort ) + conf . portsShift ) )
2021-01-15 07:37:20 +00:00
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if extIp . isSome ( ) and extTcpPort . isNone ( ) : some ( Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) )
else : extTcpPort
2021-05-11 15:07:26 +00:00
node = WakuNode . init ( conf . nodekey ,
2021-03-26 08:49:51 +00:00
conf . listenAddress , Port ( uint16 ( conf . tcpPort ) + conf . portsShift ) ,
extIp , extPort ,
pStorage )
2020-09-01 02:09:54 +00:00
waitFor node . start ( )
2020-11-21 05:31:48 +00:00
if conf . swap :
2020-11-18 12:45:51 +00:00
mountSwap ( node )
2020-11-16 09:55:49 +00:00
2020-11-26 10:02:10 +00:00
# TODO Set swap peer, for now should be same as store peer
2021-03-30 13:33:19 +00:00
# Store setup
if ( conf . storenode ! = " " ) or ( conf . store ) :
2021-01-22 09:39:16 +00:00
var store : WakuMessageStore
2020-11-16 08:38:52 +00:00
2021-05-11 15:07:26 +00:00
if ( not sqliteDatabase . isNil ) and conf . persistMessages :
2021-03-26 08:49:51 +00:00
let res = WakuMessageStore . init ( sqliteDatabase )
2020-11-16 08:38:52 +00:00
if res . isErr :
2021-01-22 09:39:16 +00:00
warn " failed to init WakuMessageStore " , err = res . error
2021-01-29 08:42:41 +00:00
waku_node_errors . inc ( labelValues = [ " init_store_failure " ] )
2020-11-16 08:38:52 +00:00
else :
store = res . value
2021-05-11 15:07:26 +00:00
mountStore ( node , store , conf . persistMessages )
2020-11-24 04:53:42 +00:00
2021-03-30 13:33:19 +00:00
if conf . storenode ! = " " :
setStorePeer ( node , conf . storenode )
2021-05-13 21:21:46 +00:00
2021-03-30 13:33:19 +00:00
# Relay setup
2021-05-04 12:11:41 +00:00
mountRelay ( node ,
conf . topics . split ( " " ) ,
2021-05-12 10:55:09 +00:00
rlnRelayEnabled = conf . rlnRelay ,
2021-05-04 12:11:41 +00:00
relayMessages = conf . relay ) # Indicates if node is capable to relay messages
2021-05-28 18:41:29 +00:00
2021-06-02 07:53:34 +00:00
# Keepalive mounted on all nodes
mountKeepalive ( node )
2021-05-28 18:41:29 +00:00
# Resume historical messages, this has to be called after the relay setup
if conf . store and conf . persistMessages :
waitFor node . resume ( )
2021-05-04 12:11:41 +00:00
if conf . staticnodes . len > 0 :
waitFor connectToNodes ( node , conf . staticnodes )
2020-10-09 13:58:50 +00:00
2021-04-24 04:56:37 +00:00
# NOTE Must be mounted after relay
2021-06-04 10:02:47 +00:00
if ( conf . lightpushnode ! = " " ) or ( conf . lightpush ) :
2021-04-24 04:56:37 +00:00
mountLightPush ( node )
2021-06-04 10:02:47 +00:00
if conf . lightpushnode ! = " " :
setLightPushPeer ( node , conf . lightpushnode )
2021-04-29 04:54:31 +00:00
# Filter setup. NOTE Must be mounted after relay
if ( conf . filternode ! = " " ) or ( conf . filter ) :
mountFilter ( node )
if conf . filternode ! = " " :
setFilterPeer ( node , conf . filternode )
2021-04-24 04:56:37 +00:00
2020-09-01 02:09:54 +00:00
if conf . rpc :
2020-12-24 08:02:30 +00:00
startRpc ( node , conf . rpcAddress , Port ( conf . rpcPort + conf . portsShift ) , conf )
2020-09-01 02:09:54 +00:00
2021-05-12 10:55:09 +00:00
if conf . metricsLogging :
2020-09-01 02:09:54 +00:00
startMetricsLog ( )
2021-06-10 14:18:41 +00:00
if conf . metricsServer :
startMetricsServer ( conf . metricsServerAddress ,
Port ( conf . metricsServerPort + conf . portsShift ) )
2021-04-15 08:18:14 +00:00
# Setup graceful shutdown
# Handle Ctrl-C SIGINT
proc handleCtrlC ( ) {. noconv . } =
when defined ( windows ) :
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc ( )
info " Shutting down after receiving SIGINT "
waitFor node . stop ( )
quit ( QuitSuccess )
setControlCHook ( handleCtrlC )
# Handle SIGTERM
when defined ( posix ) :
proc handleSigterm ( signal : cint ) {. noconv . } =
info " Shutting down after receiving SIGTERM "
waitFor node . stop ( )
quit ( QuitSuccess )
c_signal ( SIGTERM , handleSigterm )
2021-06-02 07:53:34 +00:00
# Start keepalive, if enabled
if conf . keepAlive :
node . startKeepalive ( )
2020-09-01 02:09:54 +00:00
2021-05-28 18:41:29 +00:00
runForever ( )