2022-11-04 09:52:27 +00:00
when ( NimMajor , NimMinor ) < ( 1 , 4 ) :
{. push raises : [ Defect ] . }
else :
{. push raises : [ ] . }
2022-10-18 14:05:53 +00:00
import
std / [ hashes , options , tables , strutils , sequtils , os ] ,
chronos , chronicles , metrics ,
2022-11-23 09:08:00 +00:00
stew / results ,
2022-10-18 14:05:53 +00:00
stew / byteutils ,
2022-11-23 09:08:00 +00:00
stew / shims / net as stewNet ,
2022-10-18 14:05:53 +00:00
eth / keys ,
nimcrypto ,
bearssl / rand ,
eth / p2p / discoveryv5 / enr ,
libp2p / crypto / crypto ,
libp2p / protocols / ping ,
2022-11-04 09:52:08 +00:00
libp2p / protocols / pubsub / gossipsub ,
libp2p / protocols / pubsub / rpc / messages ,
2023-01-11 09:57:49 +00:00
libp2p / protocols / connectivity / autonat / client ,
libp2p / protocols / connectivity / autonat / service ,
2022-10-18 14:05:53 +00:00
libp2p / nameresolving / nameresolver ,
2022-11-04 09:52:08 +00:00
libp2p / builders ,
libp2p / multihash ,
libp2p / transports / tcptransport ,
libp2p / transports / wstransport
2022-10-18 14:05:53 +00:00
import
2022-11-04 09:52:08 +00:00
.. / protocol / waku_message ,
.. / protocol / waku_relay ,
2022-11-23 09:08:00 +00:00
.. / protocol / waku_archive ,
2022-10-18 14:05:53 +00:00
.. / protocol / waku_store ,
2022-11-04 09:52:08 +00:00
.. / protocol / waku_store / client as store_client ,
2022-10-18 14:05:53 +00:00
.. / protocol / waku_swap / waku_swap ,
.. / protocol / waku_filter ,
2022-11-04 09:52:08 +00:00
.. / protocol / waku_filter / client as filter_client ,
2022-10-18 14:05:53 +00:00
.. / protocol / waku_lightpush ,
2022-11-04 09:52:08 +00:00
.. / protocol / waku_lightpush / client as lightpush_client ,
2022-10-18 14:05:53 +00:00
.. / protocol / waku_peer_exchange ,
2022-11-21 08:36:41 +00:00
.. / utils / peers ,
2022-11-02 10:59:58 +00:00
.. / utils / wakuenr ,
2023-02-03 08:06:21 +00:00
.. / utils / time ,
2023-02-06 09:03:30 +00:00
. / peer_manager ,
2022-10-18 14:05:53 +00:00
. / dnsdisc / waku_dnsdisc ,
. / discv5 / waku_discv5 ,
. / wakuswitch
2022-11-14 13:59:34 +00:00
when defined ( rln ) :
import
2022-12-13 09:26:24 +00:00
.. / protocol / waku_rln_relay
2022-11-14 13:59:34 +00:00
2022-10-18 14:05:53 +00:00
declarePublicGauge waku_version , " Waku version info (in git describe format) " , [ " version " ]
declarePublicCounter waku_node_messages , " number of messages received " , [ " type " ]
declarePublicGauge waku_node_errors , " number of wakunode errors " , [ " type " ]
2022-10-28 14:30:01 +00:00
declarePublicGauge waku_lightpush_peers , " number of lightpush peers "
2022-11-02 10:59:58 +00:00
declarePublicGauge waku_filter_peers , " number of filter peers "
2022-10-28 18:11:28 +00:00
declarePublicGauge waku_store_peers , " number of store peers "
2022-11-02 08:45:21 +00:00
declarePublicGauge waku_px_peers , " number of peers (in the node ' s peerManager) supporting the peer exchange protocol "
2022-10-28 14:30:01 +00:00
2022-10-18 14:05:53 +00:00
logScope :
2022-11-03 15:36:24 +00:00
topics = " waku node "
2022-10-18 14:05:53 +00:00
2022-11-02 10:59:58 +00:00
2022-10-18 14:05:53 +00:00
# Git version in git describe format (defined compile time)
const git_version * {. strdefine . } = " n/a "
# Default clientId
const clientId * = " Nimbus Waku v2 node "
# Default Waku Filter Timeout
const WakuFilterTimeout : Duration = 1 . days
# key and crypto modules different
type
2022-10-27 15:29:09 +00:00
# XXX: Weird type, should probably be using pubsub PubsubTopic object name?
2022-10-18 14:05:53 +00:00
Message * = seq [ byte ]
WakuInfo * = object
# NOTE One for simplicity, can extend later as needed
listenAddresses * : seq [ string ]
enrUri * : string
#multiaddrStrings*: seq[string]
# NOTE based on Eth2Node in NBC eth2_network.nim
2022-10-27 15:29:09 +00:00
WakuNode * = ref object
2022-10-18 14:05:53 +00:00
peerManager * : PeerManager
switch * : Switch
wakuRelay * : WakuRelay
2022-11-23 09:08:00 +00:00
wakuArchive * : WakuArchive
2022-10-18 14:05:53 +00:00
wakuStore * : WakuStore
2022-10-28 18:11:28 +00:00
wakuStoreClient * : WakuStoreClient
2022-10-18 14:05:53 +00:00
wakuFilter * : WakuFilter
2022-11-02 10:59:58 +00:00
wakuFilterClient * : WakuFilterClient
2022-10-18 14:05:53 +00:00
wakuSwap * : WakuSwap
2022-11-14 13:59:34 +00:00
when defined ( rln ) :
wakuRlnRelay * : WakuRLNRelay
2022-10-18 14:05:53 +00:00
wakuLightPush * : WakuLightPush
2022-10-28 14:30:01 +00:00
wakuLightpushClient * : WakuLightPushClient
2022-10-18 14:05:53 +00:00
wakuPeerExchange * : WakuPeerExchange
enr * : enr . Record
libp2pPing * : Ping
rng * : ref rand . HmacDrbgContext
wakuDiscv5 * : WakuDiscoveryV5
announcedAddresses * : seq [ MultiAddress ]
started * : bool # Indicates that node has started listening
2022-11-24 13:11:23 +00:00
proc protocolMatcher * ( codec : string ) : Matcher =
2022-10-18 14:05:53 +00:00
## Returns a protocol matcher function for the provided codec
proc match ( proto : string ) : bool {. gcsafe . } =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto . startsWith ( codec )
return match
template ip4TcpEndPoint ( address , port ) : MultiAddress =
MultiAddress . init ( address , tcpProtocol , port )
template dns4Ma ( dns4DomainName : string ) : MultiAddress =
MultiAddress . init ( " /dns4/ " & dns4DomainName ) . tryGet ( )
template tcpPortMa ( port : Port ) : MultiAddress =
MultiAddress . init ( " /tcp/ " & $ port ) . tryGet ( )
template dns4TcpEndPoint ( dns4DomainName : string , port : Port ) : MultiAddress =
dns4Ma ( dns4DomainName ) & tcpPortMa ( port )
template wsFlag ( wssEnabled : bool ) : MultiAddress =
if wssEnabled : MultiAddress . init ( " /wss " ) . tryGet ( )
else : MultiAddress . init ( " /ws " ) . tryGet ( )
2023-02-07 13:06:50 +00:00
type NetConfig * = object
hostAddress * : MultiAddress
wsHostAddress * : Option [ MultiAddress ]
hostExtAddress * : Option [ MultiAddress ]
wsExtAddress * : Option [ MultiAddress ]
wssEnabled * : bool
extIp * : Option [ ValidIpAddress ]
extPort * : Option [ Port ]
dns4DomainName * : Option [ string ]
announcedAddresses * : seq [ MultiAddress ]
extMultiAddrs * : seq [ MultiAddress ]
enrMultiAddrs * : seq [ MultiAddress ]
enrIp * : Option [ ValidIpAddress ]
enrPort * : Option [ Port ]
discv5UdpPort * : Option [ Port ]
wakuFlags * : Option [ WakuEnrBitfield ]
bindIp * : ValidIpAddress
bindPort * : Port
proc init * (
T : type NetConfig ,
bindIp : ValidIpAddress ,
bindPort : Port ,
extIp = none ( ValidIpAddress ) ,
extPort = none ( Port ) ,
extMultiAddrs = newSeq [ MultiAddress ] ( ) ,
wsBindPort : Port = ( Port ) 8000 ,
wsEnabled : bool = false ,
wssEnabled : bool = false ,
dns4DomainName = none ( string ) ,
discv5UdpPort = none ( Port ) ,
wakuFlags = none ( WakuEnrBitfield )
) : T {. raises : [ LPError ] } =
2022-10-18 14:05:53 +00:00
## Initialize addresses
let
# Bind addresses
hostAddress = ip4TcpEndPoint ( bindIp , bindPort )
wsHostAddress = if wsEnabled or wssEnabled : some ( ip4TcpEndPoint ( bindIp , wsbindPort ) & wsFlag ( wssEnabled ) )
else : none ( MultiAddress )
2023-02-07 13:06:50 +00:00
enrIp = if extIp . isSome ( ) : extIp else : some ( bindIp )
enrPort = if extPort . isSome ( ) : extPort else : some ( bindPort )
2022-10-18 14:05:53 +00:00
# Setup external addresses, if available
var
hostExtAddress , wsExtAddress = none ( MultiAddress )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
if ( dns4DomainName . isSome ( ) ) :
# Use dns4 for externally announced addresses
hostExtAddress = some ( dns4TcpEndPoint ( dns4DomainName . get ( ) , extPort . get ( ) ) )
if ( wsHostAddress . isSome ( ) ) :
wsExtAddress = some ( dns4TcpEndPoint ( dns4DomainName . get ( ) , wsBindPort ) & wsFlag ( wssEnabled ) )
else :
# No public domain name, use ext IP if available
if extIp . isSome ( ) and extPort . isSome ( ) :
hostExtAddress = some ( ip4TcpEndPoint ( extIp . get ( ) , extPort . get ( ) ) )
if ( wsHostAddress . isSome ( ) ) :
wsExtAddress = some ( ip4TcpEndPoint ( extIp . get ( ) , wsBindPort ) & wsFlag ( wssEnabled ) )
2023-01-26 10:18:30 +00:00
var announcedAddresses = newSeq [ MultiAddress ] ( )
2022-10-27 15:29:09 +00:00
if hostExtAddress . isSome ( ) :
2022-10-18 14:05:53 +00:00
announcedAddresses . add ( hostExtAddress . get ( ) )
else :
announcedAddresses . add ( hostAddress ) # We always have at least a bind address for the host
2022-11-21 08:36:41 +00:00
2023-01-26 10:18:30 +00:00
# External multiaddrs that the operator may have configured
if extMultiAddrs . len > 0 :
announcedAddresses . add ( extMultiAddrs )
2022-10-27 15:29:09 +00:00
if wsExtAddress . isSome ( ) :
2022-10-18 14:05:53 +00:00
announcedAddresses . add ( wsExtAddress . get ( ) )
2022-10-27 15:29:09 +00:00
elif wsHostAddress . isSome ( ) :
2022-10-18 14:05:53 +00:00
announcedAddresses . add ( wsHostAddress . get ( ) )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
let
2023-01-26 10:18:30 +00:00
# enrMultiaddrs are just addresses which cannot be represented in ENR, as described in
# https://rfc.vac.dev/spec/31/#many-connection-types
enrMultiaddrs = announcedAddresses . filterIt ( it . hasProtocol ( " dns4 " ) or
it . hasProtocol ( " dns6 " ) or
it . hasProtocol ( " ws " ) or
it . hasProtocol ( " wss " ) )
2022-11-21 08:36:41 +00:00
2023-02-07 13:06:50 +00:00
return NetConfig (
hostAddress : hostAddress ,
wsHostAddress : wsHostAddress ,
hostExtAddress : hostExtAddress ,
wsExtAddress : wsExtAddress ,
extIp : extIp ,
extPort : extPort ,
wssEnabled : wssEnabled ,
dns4DomainName : dns4DomainName ,
announcedAddresses : announcedAddresses ,
extMultiAddrs : extMultiAddrs ,
enrMultiaddrs : enrMultiaddrs ,
enrIp : enrIp ,
enrPort : enrPort ,
discv5UdpPort : discv5UdpPort ,
bindIp : bindIp ,
bindPort : bindPort ,
wakuFlags : wakuFlags )
proc getEnr * ( netConfig : NetConfig ,
wakuDiscV5 = none ( WakuDiscoveryV5 ) ,
nodeKey : crypto . PrivateKey ) : enr . Record =
if wakuDiscV5 . isSome ( ) :
return wakuDiscV5 . get ( ) . protocol . getRecord ( )
return enr . Record . init ( nodekey ,
netConfig . enrIp ,
netConfig . enrPort ,
netConfig . discv5UdpPort ,
netConfig . wakuFlags ,
netConfig . enrMultiaddrs )
proc getAutonatService * ( rng = crypto . newRng ( ) ) : AutonatService =
2023-01-11 09:57:49 +00:00
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService . new (
autonatClient = AutonatClient . new ( ) ,
rng = rng ,
scheduleInterval = some ( chronos . seconds ( 120 ) ) ,
askNewConnectedPeers = false ,
numPeersToAsk = 3 ,
maxQueueSize = 3 ,
minConfidence = 0 .7 )
proc statusAndConfidenceHandler ( networkReachability : NetworkReachability , confidence : Option [ float ] ) {. gcsafe , async . } =
if confidence . isSome ( ) :
info " Peer reachability status " , networkReachability = networkReachability , confidence = confidence . get ( )
autonatService . statusAndConfidenceHandler ( statusAndConfidenceHandler )
2023-02-07 13:06:50 +00:00
return autonatService
## retain old signature, but deprecate it
proc new * ( T : type WakuNode ,
nodeKey : crypto . PrivateKey ,
bindIp : ValidIpAddress ,
bindPort : Port ,
extIp = none ( ValidIpAddress ) ,
extPort = none ( Port ) ,
extMultiAddrs = newSeq [ MultiAddress ] ( ) ,
peerStorage : PeerStorage = nil ,
maxConnections = builders . MaxConnections ,
wsBindPort : Port = ( Port ) 8000 ,
wsEnabled : bool = false ,
wssEnabled : bool = false ,
secureKey : string = " " ,
secureCert : string = " " ,
wakuFlags = none ( WakuEnrBitfield ) ,
nameResolver : NameResolver = nil ,
sendSignedPeerRecord = false ,
dns4DomainName = none ( string ) ,
discv5UdpPort = none ( Port ) ,
wakuDiscv5 = none ( WakuDiscoveryV5 ) ,
agentString = none ( string ) , # defaults to nim-libp2p version
peerStoreCapacity = none ( int ) , # defaults to 1.25 maxConnections
# TODO: make this argument required after tests are updated
rng : ref HmacDrbgContext = crypto . newRng ( )
) : T {. raises : [ Defect , LPError , IOError , TLSStreamProtocolError ] , deprecated : " Use NetConfig variant " . } =
let netConfig = NetConfig . init (
bindIp = bindIp ,
bindPort = bindPort ,
extIp = extIp ,
extPort = extPort ,
extMultiAddrs = extMultiAddrs ,
wsBindPort = wsBindPort ,
wsEnabled = wsEnabled ,
wssEnabled = wssEnabled ,
wakuFlags = wakuFlags ,
dns4DomainName = dns4DomainName ,
discv5UdpPort = discv5UdpPort ,
)
return WakuNode . new (
nodeKey = nodeKey ,
netConfig = netConfig ,
peerStorage = peerStorage ,
maxConnections = maxConnections ,
secureKey = secureKey ,
secureCert = secureCert ,
nameResolver = nameResolver ,
sendSignedPeerRecord = sendSignedPeerRecord ,
wakuDiscv5 = wakuDiscv5 ,
agentString = agentString ,
peerStoreCapacity = peerStoreCapacity ,
)
proc new * ( T : type WakuNode ,
nodeKey : crypto . PrivateKey ,
netConfig : NetConfig ,
peerStorage : PeerStorage = nil ,
maxConnections = builders . MaxConnections ,
secureKey : string = " " ,
secureCert : string = " " ,
nameResolver : NameResolver = nil ,
sendSignedPeerRecord = false ,
wakuDiscv5 = none ( WakuDiscoveryV5 ) ,
agentString = none ( string ) , # defaults to nim-libp2p version
peerStoreCapacity = none ( int ) , # defaults to 1.25 maxConnections
# TODO: make this argument required after tests are updated
rng : ref HmacDrbgContext = crypto . newRng ( )
) : T {. raises : [ Defect , LPError , IOError , TLSStreamProtocolError ] . } =
## Creates a Waku Node instance.
info " Initializing networking " , addrs = netConfig . announcedAddresses
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
let switch = newWakuSwitch (
some ( nodekey ) ,
2023-02-07 13:06:50 +00:00
address = netConfig . hostAddress ,
wsAddress = netConfig . wsHostAddress ,
2022-10-18 14:05:53 +00:00
transportFlags = { ServerFlags . ReuseAddr } ,
2022-11-21 08:36:41 +00:00
rng = rng ,
2022-10-18 14:05:53 +00:00
maxConnections = maxConnections ,
2023-02-07 13:06:50 +00:00
wssEnabled = netConfig . wssEnabled ,
2022-10-18 14:05:53 +00:00
secureKeyPath = secureKey ,
secureCertPath = secureCert ,
nameResolver = nameResolver ,
2022-10-28 13:12:06 +00:00
sendSignedPeerRecord = sendSignedPeerRecord ,
2022-11-24 13:11:23 +00:00
agentString = agentString ,
peerStoreCapacity = peerStoreCapacity ,
2023-02-07 13:06:50 +00:00
services = @ [ Service ( getAutonatService ( rng ) ) ] ,
2022-10-27 15:29:09 +00:00
)
2022-11-21 08:36:41 +00:00
2023-02-07 13:06:50 +00:00
return WakuNode (
2022-10-18 14:05:53 +00:00
peerManager : PeerManager . new ( switch , peerStorage ) ,
switch : switch ,
rng : rng ,
2023-02-07 13:06:50 +00:00
enr : netConfig . getEnr ( wakuDiscv5 , nodekey ) ,
announcedAddresses : netConfig . announcedAddresses ,
wakuDiscv5 : if wakuDiscV5 . isSome ( ) : wakuDiscV5 . get ( ) else : nil ,
2022-10-18 14:05:53 +00:00
)
2022-11-21 08:36:41 +00:00
proc peerInfo * ( node : WakuNode ) : PeerInfo =
2022-10-18 14:05:53 +00:00
node . switch . peerInfo
2022-10-27 15:29:09 +00:00
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info * ( node : WakuNode ) : WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node . switch . peerInfo
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
var listenStr : seq [ string ]
for address in node . announcedAddresses :
var fulladdr = $ address & " /p2p/ " & $ peerInfo . peerId
listenStr & = fulladdr
let enrUri = node . enr . toUri ( )
let wakuInfo = WakuInfo ( listenAddresses : listenStr , enrUri : enrUri )
return wakuInfo
proc connectToNodes * ( node : WakuNode , nodes : seq [ RemotePeerInfo ] | seq [ string ] , source = " api " ) {. async . } =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE This is dialing on WakuRelay protocol specifically
2023-02-08 15:26:23 +00:00
await peer_manager . connectToNodes ( node . peerManager , nodes , WakuRelayCodec , source = source )
2022-10-27 15:29:09 +00:00
## Waku relay
2023-02-08 15:09:59 +00:00
proc registerRelayDefaultHandler ( node : WakuNode , topic : PubsubTopic ) =
if node . wakuRelay . isSubscribed ( topic ) :
2022-10-18 14:05:53 +00:00
return
2023-02-08 15:09:59 +00:00
proc traceHandler ( topic : PubsubTopic , data : seq [ byte ] ) {. async , gcsafe . } =
2023-02-03 08:06:21 +00:00
trace " waku.relay received " ,
pubsubTopic = topic ,
hash = MultiHash . digest ( " sha2-256 " , data ) . expect ( " valid hash " ) . data . buffer . to0xHex ( ) , # TODO: this could be replaced by a message UID
receivedTime = getNowInNanosecondTime ( )
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
waku_node_messages . inc ( labelValues = [ " relay " ] )
proc filterHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async , gcsafe . } =
if node . wakuFilter . isNil ( ) :
return
2022-11-21 08:36:41 +00:00
2023-02-08 15:09:59 +00:00
await node . wakuFilter . handleMessage ( topic , msg )
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
proc archiveHandler ( topic : PubsubTopic , msg : WakuMessage ) {. async , gcsafe . } =
if node . wakuArchive . isNil ( ) :
return
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
node . wakuArchive . handleMessage ( topic , msg )
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
let defaultHandler = proc ( topic : PubsubTopic , data : seq [ byte ] ) {. async , gcsafe . } =
let msg = WakuMessage . decode ( data )
if msg . isErr ( ) :
return
await traceHandler ( topic , data )
await filterHandler ( topic , msg . value )
await archiveHandler ( topic , msg . value )
node . wakuRelay . subscribe ( topic , defaultHandler )
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
proc subscribe * ( node : WakuNode , topic : PubsubTopic ) =
if node . wakuRelay . isNil ( ) :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
return
debug " subscribe " , pubsubTopic = topic
node . registerRelayDefaultHandler ( topic )
proc subscribe * ( node : WakuNode , topic : PubsubTopic , handler : WakuRelayHandler ) =
2022-10-18 14:05:53 +00:00
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
2023-02-08 15:09:59 +00:00
if node . wakuRelay . isNil ( ) :
error " Invalid API call to `subscribe`. WakuRelay not mounted. "
return
debug " subscribe " , pubsubTopic = topic
2022-10-18 14:05:53 +00:00
2023-02-08 15:09:59 +00:00
node . registerRelayDefaultHandler ( topic )
node . wakuRelay . subscribe ( topic , handler )
proc unsubscribe * ( node : WakuNode , topic : PubsubTopic , handler : WakuRelayHandler ) =
2022-10-18 14:05:53 +00:00
## Unsubscribes a handler from a PubSub topic.
2022-10-27 15:29:09 +00:00
if node . wakuRelay . isNil ( ) :
2022-10-18 14:05:53 +00:00
error " Invalid API call to `unsubscribe`. WakuRelay not mounted. "
return
2022-11-21 08:36:41 +00:00
2023-02-08 15:09:59 +00:00
debug " unsubscribe " , oubsubTopic = topic
2022-10-18 14:05:53 +00:00
let wakuRelay = node . wakuRelay
wakuRelay . unsubscribe ( @ [ ( topic , handler ) ] )
2022-10-27 15:29:09 +00:00
proc unsubscribeAll * ( node : WakuNode , topic : PubsubTopic ) =
2022-10-18 14:05:53 +00:00
## Unsubscribes all handlers registered on a specific PubSub topic.
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
if node . wakuRelay . isNil ( ) :
2022-10-18 14:05:53 +00:00
error " Invalid API call to `unsubscribeAll`. WakuRelay not mounted. "
return
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
info " unsubscribeAll " , topic = topic
2023-02-08 15:09:59 +00:00
node . wakuRelay . unsubscribeAll ( topic )
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
proc publish * ( node : WakuNode , topic : PubsubTopic , message : WakuMessage ) {. async , gcsafe . } =
2022-10-18 14:05:53 +00:00
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
2022-11-21 08:36:41 +00:00
2022-10-25 12:55:31 +00:00
if node . wakuRelay . isNil ( ) :
2022-10-18 14:05:53 +00:00
error " Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead. "
2022-10-25 12:55:31 +00:00
# TODO: Improve error handling
2022-10-18 14:05:53 +00:00
return
2023-01-25 13:35:38 +00:00
discard await node . wakuRelay . publish ( topic , message )
2022-10-18 14:05:53 +00:00
2023-02-03 08:06:21 +00:00
trace " waku.relay published " ,
pubsubTopic = topic ,
hash = MultiHash . digest ( " sha2-256 " , message . encode ( ) . buffer ) . expect ( " valid hash " ) . data . buffer . to0xHex ( ) , # TODO: this could be replaced by a message UID
publishTime = getNowInNanosecondTime ( )
2022-10-27 15:29:09 +00:00
proc startRelay * ( node : WakuNode ) {. async . } =
2022-11-18 13:50:56 +00:00
## Setup and start relay protocol
info " starting relay protocol "
2022-10-27 15:29:09 +00:00
if node . wakuRelay . isNil ( ) :
2023-02-08 15:09:59 +00:00
error " Failed to start relay. Not mounted. "
2022-10-27 15:29:09 +00:00
return
2022-10-18 14:05:53 +00:00
2022-11-18 13:50:56 +00:00
## Setup relay protocol
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
# Resume previous relay connections
2022-11-24 13:11:23 +00:00
if node . peerManager . peerStore . hasPeers ( protocolMatcher ( WakuRelayCodec ) ) :
2022-10-27 15:29:09 +00:00
info " Found previous WakuRelay peers. Reconnecting. "
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node . wakuRelay . parameters . pruneBackoff + chronos . seconds ( BackoffSlackTime )
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
await node . peerManager . reconnectPeers ( WakuRelayCodec ,
protocolMatcher ( WakuRelayCodec ) ,
backoffPeriod )
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
# Start the WakuRelay protocol
await node . wakuRelay . start ( )
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
info " relay started successfully "
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
proc mountRelay * ( node : WakuNode ,
2022-11-18 13:50:56 +00:00
topics : seq [ string ] = @ [ ] ,
2022-10-27 15:29:09 +00:00
triggerSelf = true ,
2022-11-18 13:50:56 +00:00
peerExchangeHandler = none ( RoutingRecordsHandler ) ) {. async , gcsafe . } =
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info " mounting relay protocol "
2022-11-21 08:36:41 +00:00
2022-11-18 13:50:56 +00:00
let initRes = WakuRelay . new (
2023-01-11 08:10:46 +00:00
node . switch ,
2022-11-18 13:50:56 +00:00
triggerSelf = triggerSelf
)
if initRes . isErr ( ) :
error " failed mountin relay protocol " , error = initRes . error
return
2022-10-18 14:05:53 +00:00
2022-11-18 13:50:56 +00:00
node . wakuRelay = initRes . value
2022-10-27 15:29:09 +00:00
## Add peer exchange handler
if peerExchangeHandler . isSome ( ) :
2022-11-18 13:50:56 +00:00
node . wakuRelay . parameters . enablePX = true # Feature flag for peer exchange in nim-libp2p
node . wakuRelay . routingRecordsHandler . add ( peerExchangeHandler . get ( ) )
2022-10-27 15:29:09 +00:00
if node . started :
await node . startRelay ( )
2022-11-18 13:50:56 +00:00
node . switch . mount ( node . wakuRelay , protocolMatcher ( WakuRelayCodec ) )
2022-10-27 15:29:09 +00:00
info " relay mounted successfully "
2022-10-18 14:05:53 +00:00
2023-02-10 14:17:50 +00:00
# Subscribe to topics
for topic in topics :
node . subscribe ( topic )
2022-10-27 15:29:09 +00:00
## Waku filter
2022-10-18 14:05:53 +00:00
proc mountFilter * ( node : WakuNode , filterTimeout : Duration = WakuFilterTimeout ) {. async , raises : [ Defect , LPError ] } =
2022-11-02 10:59:58 +00:00
info " mounting filter protocol "
node . wakuFilter = WakuFilter . new ( node . peerManager , node . rng , filterTimeout )
2022-10-18 14:05:53 +00:00
if node . started :
await node . wakuFilter . start ( )
node . switch . mount ( node . wakuFilter , protocolMatcher ( WakuFilterCodec ) )
2022-11-02 10:59:58 +00:00
proc filterHandleMessage * ( node : WakuNode , pubsubTopic : PubsubTopic , message : WakuMessage ) {. async . } =
2022-10-27 15:29:09 +00:00
if node . wakuFilter . isNil ( ) :
2022-11-02 10:59:58 +00:00
error " cannot handle filter message " , error = " waku filter is nil "
2022-10-27 15:29:09 +00:00
return
2022-11-02 10:59:58 +00:00
await node . wakuFilter . handleMessage ( pubsubTopic , message )
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
proc mountFilterClient * ( node : WakuNode ) {. async , raises : [ Defect , LPError ] . } =
info " mounting filter client "
node . wakuFilterClient = WakuFilterClient . new ( node . peerManager , node . rng )
if node . started :
# Node has started already. Let's start filter too.
await node . wakuFilterClient . start ( )
node . switch . mount ( node . wakuFilterClient , protocolMatcher ( WakuFilterCodec ) )
2022-11-21 08:36:41 +00:00
proc filterSubscribe * ( node : WakuNode , pubsubTopic : PubsubTopic , contentTopics : ContentTopic | seq [ ContentTopic ] ,
2022-11-02 10:59:58 +00:00
handler : FilterPushHandler , peer : RemotePeerInfo | string ) {. async , gcsafe , raises : [ Defect , ValueError ] . } =
2022-10-27 15:29:09 +00:00
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
2022-11-02 10:59:58 +00:00
if node . wakuFilterClient . isNil ( ) :
error " cannot register filter subscription to topic " , error = " waku filter client is nil "
return
2022-11-21 08:36:41 +00:00
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
2022-11-02 10:59:58 +00:00
else : peer
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
info " registering filter subscription to content " , pubsubTopic = pubsubTopic , contentTopics = contentTopics , peer = remotePeer
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app
let handlerWrapper : FilterPushHandler = proc ( pubsubTopic : string , message : WakuMessage ) {. raises : [ Exception ] . } =
if node . wakuRelay . isNil ( ) and not node . wakuStore . isNil ( ) :
2022-11-23 09:08:00 +00:00
node . wakuArchive . handleMessage ( pubSubTopic , message )
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
handler ( pubsubTopic , message )
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
let subRes = await node . wakuFilterClient . subscribe ( pubsubTopic , contentTopics , handlerWrapper , peer = remotePeer )
if subRes . isOk ( ) :
info " subscribed to topic " , pubsubTopic = pubsubTopic , contentTopics = contentTopics
else :
error " failed filter subscription " , error = subRes . error
waku_node_errors . inc ( labelValues = [ " subscribe_filter_failure " ] )
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
proc filterUnsubscribe * ( node : WakuNode , pubsubTopic : PubsubTopic , contentTopics : ContentTopic | seq [ ContentTopic ] ,
peer : RemotePeerInfo | string ) {. async , gcsafe , raises : [ Defect , ValueError ] . } =
2022-10-27 15:29:09 +00:00
## Unsubscribe from a content filter.
2022-11-02 10:59:58 +00:00
if node . wakuFilterClient . isNil ( ) :
error " cannot unregister filter subscription to content " , error = " waku filter client is nil "
return
2022-11-21 08:36:41 +00:00
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
2022-11-02 10:59:58 +00:00
else : peer
2022-11-21 08:36:41 +00:00
2022-11-02 10:59:58 +00:00
info " deregistering filter subscription to content " , pubsubTopic = pubsubTopic , contentTopics = contentTopics , peer = remotePeer
2022-11-21 08:36:41 +00:00
2022-11-02 10:59:58 +00:00
let unsubRes = await node . wakuFilterClient . unsubscribe ( pubsubTopic , contentTopics , peer = remotePeer )
if unsubRes . isOk ( ) :
info " unsubscribed from topic " , pubsubTopic = pubsubTopic , contentTopics = contentTopics
else :
error " failed filter unsubscription " , error = unsubRes . error
waku_node_errors . inc ( labelValues = [ " unsubscribe_filter_failure " ] )
# TODO: Move to application module (e.g., wakunode2.nim)
proc setFilterPeer * ( node : WakuNode , peer : RemotePeerInfo | string ) {. raises : [ Defect , ValueError , LPError ] ,
deprecated : " Use the explicit destination peer procedures " . } =
if node . wakuFilterClient . isNil ( ) :
error " could not set peer, waku filter client is nil "
return
info " seting filter client peer " , peer = peer
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
else : peer
node . peerManager . addPeer ( remotePeer , WakuFilterCodec )
waku_filter_peers . inc ( )
# TODO: Move to application module (e.g., wakunode2.nim)
proc subscribe * ( node : WakuNode , pubsubTopic : PubsubTopic , contentTopics : ContentTopic | seq [ ContentTopic ] , handler : FilterPushHandler ) {. async , gcsafe ,
deprecated : " Use the explicit destination peer procedure. Use ' node.filterSubscribe() ' instead. " . } =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node . wakuFilterClient . isNil ( ) :
error " cannot register filter subscription to topic " , error = " waku filter client is nil "
return
2022-11-21 08:36:41 +00:00
2023-01-26 09:20:20 +00:00
let peerOpt = node . peerManager . selectPeer ( WakuFilterCodec )
2022-11-02 10:59:58 +00:00
if peerOpt . isNone ( ) :
error " cannot register filter subscription to topic " , error = " no suitable remote peers "
return
2022-11-21 08:36:41 +00:00
2022-11-02 10:59:58 +00:00
await node . filterSubscribe ( pubsubTopic , contentTopics , handler , peer = peerOpt . get ( ) )
# TODO: Move to application module (e.g., wakunode2.nim)
proc unsubscribe * ( node : WakuNode , pubsubTopic : PubsubTopic , contentTopics : ContentTopic | seq [ ContentTopic ] ) {. async , gcsafe ,
deprecated : " Use the explicit destination peer procedure. Use ' node.filterUnsusbscribe() ' instead. " . } =
## Unsubscribe from a content filter.
if node . wakuFilterClient . isNil ( ) :
error " cannot unregister filter subscription to content " , error = " waku filter client is nil "
return
2022-11-21 08:36:41 +00:00
2023-01-26 09:20:20 +00:00
let peerOpt = node . peerManager . selectPeer ( WakuFilterCodec )
2022-11-02 10:59:58 +00:00
if peerOpt . isNone ( ) :
error " cannot register filter subscription to topic " , error = " no suitable remote peers "
return
2022-10-27 15:29:09 +00:00
2022-11-02 10:59:58 +00:00
await node . filterUnsubscribe ( pubsubTopic , contentTopics , peer = peerOpt . get ( ) )
2022-10-27 15:29:09 +00:00
## Waku swap
2022-10-18 14:05:53 +00:00
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
proc mountSwap * ( node : WakuNode , swapConfig : SwapConfig = SwapConfig . init ( ) ) {. async , raises : [ Defect , LPError ] . } =
info " mounting swap " , mode = $ swapConfig . mode
node . wakuSwap = WakuSwap . init ( node . peerManager , node . rng , swapConfig )
if node . started :
# Node has started already. Let's start swap too.
await node . wakuSwap . start ( )
node . switch . mount ( node . wakuSwap , protocolMatcher ( WakuSwapCodec ) )
2022-11-23 09:08:00 +00:00
## Waku archive
2022-10-27 15:29:09 +00:00
2022-11-23 09:08:00 +00:00
proc mountArchive * ( node : WakuNode ,
driver : Option [ ArchiveDriver ] ,
messageValidator : Option [ MessageValidator ] ,
retentionPolicy : Option [ RetentionPolicy ] ) =
2022-10-18 14:05:53 +00:00
2022-11-23 09:08:00 +00:00
if driver . isNone ( ) :
error " failed to mount waku archive protocol " , error = " archive driver not set "
2022-10-18 14:05:53 +00:00
return
2022-11-23 09:08:00 +00:00
node . wakuArchive = WakuArchive . new ( driver . get ( ) , messageValidator , retentionPolicy )
# TODO: Review this periodic task. Maybe, move it to the appplication code
const WakuArchiveDefaultRetentionPolicyInterval * = 30 . minutes
proc executeMessageRetentionPolicy * ( node : WakuNode ) =
if node . wakuArchive . isNil ( ) :
2022-10-18 14:05:53 +00:00
return
debug " executing message retention policy "
2022-11-23 09:08:00 +00:00
node . wakuArchive . executeMessageRetentionPolicy ( )
node . wakuArchive . reportStoredMessagesMetric ( )
2022-10-18 14:05:53 +00:00
proc startMessageRetentionPolicyPeriodicTask * ( node : WakuNode , interval : Duration ) =
2022-11-23 09:08:00 +00:00
if node . wakuArchive . isNil ( ) :
2022-10-18 14:05:53 +00:00
return
# https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy : proc ( udata : pointer ) {. gcsafe , raises : [ Defect ] . }
2022-11-21 08:36:41 +00:00
executeRetentionPolicy = proc ( udata : pointer ) {. gcsafe . } =
2022-10-18 14:05:53 +00:00
executeMessageRetentionPolicy ( node )
discard setTimer ( Moment . fromNow ( interval ) , executeRetentionPolicy )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
discard setTimer ( Moment . fromNow ( interval ) , executeRetentionPolicy )
2022-11-23 09:08:00 +00:00
## Waku store
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toArchiveQuery ( request : HistoryQuery ) : ArchiveQuery =
ArchiveQuery (
pubsubTopic : request . pubsubTopic ,
contentTopics : request . contentTopics ,
cursor : request . cursor . map ( proc ( cursor : HistoryCursor ) : ArchiveCursor = ArchiveCursor ( pubsubTopic : cursor . pubsubTopic , senderTime : cursor . senderTime , storeTime : cursor . storeTime , digest : cursor . digest ) ) ,
startTime : request . startTime ,
endTime : request . endTime ,
pageSize : request . pageSize . uint ,
ascending : request . ascending
2022-10-18 14:05:53 +00:00
)
2022-11-23 09:08:00 +00:00
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toHistoryResult * ( res : ArchiveResult ) : HistoryResult =
if res . isErr ( ) :
let error = res . error
case res . error . kind :
of ArchiveErrorKind . DRIVER_ERROR , ArchiveErrorKind . INVALID_QUERY :
err ( HistoryError (
kind : HistoryErrorKind . BAD_REQUEST ,
cause : res . error . cause
) )
else :
err ( HistoryError ( kind : HistoryErrorKind . UNKNOWN ) )
else :
let response = res . get ( )
ok ( HistoryResponse (
messages : response . messages ,
cursor : response . cursor . map ( proc ( cursor : ArchiveCursor ) : HistoryCursor = HistoryCursor ( pubsubTopic : cursor . pubsubTopic , senderTime : cursor . senderTime , storeTime : cursor . storeTime , digest : cursor . digest ) ) ,
) )
proc mountStore * ( node : WakuNode ) {. async , raises : [ Defect , LPError ] . } =
info " mounting waku store protocol "
if node . wakuArchive . isNil ( ) :
error " failed to mount waku store protocol " , error = " waku archive not set "
return
# TODO: Review this handler logic. Maybe, move it to the appplication code
let queryHandler : HistoryQueryHandler = proc ( request : HistoryQuery ) : HistoryResult =
let request = request . toArchiveQuery ( )
let response = node . wakuArchive . findMessages ( request )
response . toHistoryResult ( )
node . wakuStore = WakuStore . new ( node . peerManager , node . rng , queryHandler )
2022-10-18 14:05:53 +00:00
if node . started :
# Node has started already. Let's start store too.
await node . wakuStore . start ( )
node . switch . mount ( node . wakuStore , protocolMatcher ( WakuStoreCodec ) )
2022-11-02 10:59:58 +00:00
2022-11-23 09:08:00 +00:00
proc mountStoreClient * ( node : WakuNode ) =
2022-11-02 10:59:58 +00:00
info " mounting store client "
2022-11-23 09:08:00 +00:00
node . wakuStoreClient = WakuStoreClient . new ( node . peerManager , node . rng )
2022-11-02 10:59:58 +00:00
2022-10-28 18:11:28 +00:00
proc query * ( node : WakuNode , query : HistoryQuery , peer : RemotePeerInfo ) : Future [ WakuStoreResult [ HistoryResponse ] ] {. async , gcsafe . } =
## Queries known nodes for historical messages
if node . wakuStoreClient . isNil ( ) :
return err ( " waku store client is nil " )
let queryRes = await node . wakuStoreClient . query ( query , peer )
if queryRes . isErr ( ) :
2022-11-09 17:50:18 +00:00
return err ( $ queryRes . error )
2022-11-21 08:36:41 +00:00
2022-10-28 18:11:28 +00:00
let response = queryRes . get ( )
return ok ( response )
# TODO: Move to application module (e.g., wakunode2.nim)
proc setStorePeer * ( node : WakuNode , peer : RemotePeerInfo | string ) {. raises : [ Defect , ValueError , LPError ] ,
deprecated : " Use ' node.query() ' with peer destination instead " . } =
if node . wakuStoreClient . isNil ( ) :
error " could not set peer, waku store client is nil "
2022-10-18 14:05:53 +00:00
return
2022-10-28 18:11:28 +00:00
info " set store peer " , peer = peer
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
else : peer
2022-10-28 18:11:28 +00:00
node . peerManager . addPeer ( remotePeer , WakuStoreCodec )
waku_store_peers . inc ( )
2022-10-18 14:05:53 +00:00
2022-10-28 18:11:28 +00:00
# TODO: Move to application module (e.g., wakunode2.nim)
proc query * ( node : WakuNode , query : HistoryQuery ) : Future [ WakuStoreResult [ HistoryResponse ] ] {. async , gcsafe ,
deprecated : " Use ' node.query() ' with peer destination instead " . } =
2022-10-27 15:29:09 +00:00
## Queries known nodes for historical messages
2022-10-28 18:11:28 +00:00
if node . wakuStoreClient . isNil ( ) :
return err ( " waku store client is nil " )
2022-10-18 14:05:53 +00:00
2023-01-26 09:20:20 +00:00
let peerOpt = node . peerManager . selectPeer ( WakuStoreCodec )
2022-10-28 18:11:28 +00:00
if peerOpt . isNone ( ) :
error " no suitable remote peers "
return err ( " peer_not_found_failure " )
return await node . query ( query , peerOpt . get ( ) )
2022-10-18 14:05:53 +00:00
2022-11-23 09:08:00 +00:00
when defined ( waku_exp_store_resume ) :
# TODO: Move to application module (e.g., wakunode2.nim)
proc resume * ( node : WakuNode , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) ) {. async , gcsafe . } =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node . wakuStoreClient . isNil ( ) :
return
2022-10-18 14:05:53 +00:00
2022-11-23 09:08:00 +00:00
let retrievedMessages = await node . wakuStoreClient . resume ( peerList )
if retrievedMessages . isErr ( ) :
error " failed to resume store " , error = retrievedMessages . error
return
2022-11-21 08:36:41 +00:00
2022-11-23 09:08:00 +00:00
info " the number of retrieved messages since the last online time: " , number = retrievedMessages . value
2022-10-18 14:05:53 +00:00
2022-10-27 15:29:09 +00:00
## Waku lightpush
2022-10-18 14:05:53 +00:00
2022-10-25 12:55:31 +00:00
proc mountLightPush * ( node : WakuNode ) {. async . } =
2022-10-18 14:05:53 +00:00
info " mounting light push "
2022-10-25 12:55:31 +00:00
var pushHandler : PushMessageHandler
if node . wakuRelay . isNil ( ) :
debug " mounting lightpush without relay (nil) "
2022-11-21 08:36:41 +00:00
pushHandler = proc ( peer : PeerId , pubsubTopic : string , message : WakuMessage ) : Future [ WakuLightPushResult [ void ] ] {. async . } =
2022-10-25 12:55:31 +00:00
return err ( " no waku relay found " )
2022-10-18 14:05:53 +00:00
else :
2022-11-21 08:36:41 +00:00
pushHandler = proc ( peer : PeerId , pubsubTopic : string , message : WakuMessage ) : Future [ WakuLightPushResult [ void ] ] {. async . } =
2022-10-25 12:55:31 +00:00
discard await node . wakuRelay . publish ( pubsubTopic , message . encode ( ) . buffer )
return ok ( )
debug " mounting lightpush with relay "
node . wakuLightPush = WakuLightPush . new ( node . peerManager , node . rng , pushHandler )
2022-10-18 14:05:53 +00:00
if node . started :
# Node has started already. Let's start lightpush too.
await node . wakuLightPush . start ( )
node . switch . mount ( node . wakuLightPush , protocolMatcher ( WakuLightPushCodec ) )
2022-11-02 10:59:58 +00:00
proc mountLightPushClient * ( node : WakuNode ) =
info " mounting light push client "
node . wakuLightpushClient = WakuLightPushClient . new ( node . peerManager , node . rng )
2022-10-28 14:30:01 +00:00
proc lightpushPublish * ( node : WakuNode , pubsubTopic : PubsubTopic , message : WakuMessage , peer : RemotePeerInfo ) : Future [ WakuLightPushResult [ void ] ] {. async , gcsafe . } =
2022-10-27 15:29:09 +00:00
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
2022-10-28 14:30:01 +00:00
if node . wakuLightpushClient . isNil ( ) :
return err ( " waku lightpush client is nil " )
debug " publishing message with lightpush " , pubsubTopic = pubsubTopic , contentTopic = message . contentTopic , peer = peer
2022-10-27 15:29:09 +00:00
2022-10-28 14:30:01 +00:00
return await node . wakuLightpushClient . publish ( pubsubTopic , message , peer )
# TODO: Move to application module (e.g., wakunode2.nim)
proc setLightPushPeer * ( node : WakuNode , peer : RemotePeerInfo | string ) {. raises : [ Defect , ValueError , LPError ] ,
deprecated : " Use ' node.lightpushPublish() ' instead " . } =
debug " seting lightpush client peer " , peer = peer
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
else : peer
node . peerManager . addPeer ( remotePeer , WakuLightPushCodec )
waku_lightpush_peers . inc ( )
# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish * ( node : WakuNode , pubsubTopic : PubsubTopic , message : WakuMessage ) : Future [ void ] {. async , gcsafe ,
deprecated : " Use ' node.lightpushPublish() ' instead " . } =
if node . wakuLightpushClient . isNil ( ) :
error " failed to publish message " , error = " waku lightpush client is nil "
return
2022-11-21 08:36:41 +00:00
2023-01-26 09:20:20 +00:00
let peerOpt = node . peerManager . selectPeer ( WakuLightPushCodec )
2022-10-28 14:30:01 +00:00
if peerOpt . isNone ( ) :
error " failed to publish message " , error = " no suitable remote peers "
return
let publishRes = await node . lightpushPublish ( pubsubTopic , message , peer = peerOpt . get ( ) )
if publishRes . isOk ( ) :
return
2022-10-27 15:29:09 +00:00
2022-10-28 14:30:01 +00:00
error " failed to publish message " , error = publishRes . error
2022-10-27 15:29:09 +00:00
2022-12-13 09:26:24 +00:00
## Waku RLN Relay
when defined ( rln ) :
2023-01-11 08:10:46 +00:00
proc mountRlnRelay * ( node : WakuNode ,
2022-12-13 09:26:24 +00:00
rlnConf : WakuRlnConfig ,
spamHandler : Option [ SpamHandler ] = none ( SpamHandler ) ,
registrationHandler : Option [ RegistrationHandler ] = none ( RegistrationHandler ) ) {. async . } =
info " mounting rln relay "
2023-01-11 08:10:46 +00:00
let rlnRelayRes = await WakuRlnRelay . new ( node . wakuRelay ,
2022-12-13 09:26:24 +00:00
rlnConf ,
spamHandler ,
registrationHandler )
if rlnRelayRes . isErr ( ) :
error " failed to mount rln relay " , error = rlnRelayRes . error
return
node . wakuRlnRelay = rlnRelayRes . get ( )
2022-10-27 15:29:09 +00:00
2023-01-26 10:18:30 +00:00
2022-10-27 15:29:09 +00:00
## Waku peer-exchange
2022-11-02 08:45:21 +00:00
proc mountPeerExchange * ( node : WakuNode ) {. async , raises : [ Defect , LPError ] . } =
2022-10-18 14:05:53 +00:00
info " mounting waku peer exchange "
var discv5Opt : Option [ WakuDiscoveryV5 ]
if not node . wakuDiscV5 . isNil ( ) :
discv5Opt = some ( node . wakuDiscV5 )
2022-11-02 08:45:21 +00:00
node . wakuPeerExchange = WakuPeerExchange . new ( node . peerManager , discv5Opt )
2022-10-18 14:05:53 +00:00
if node . started :
await node . wakuPeerExchange . start ( )
node . switch . mount ( node . wakuPeerExchange , protocolMatcher ( WakuPeerExchangeCodec ) )
2023-02-09 15:59:29 +00:00
proc fetchPeerExchangePeers * ( node : Wakunode , amount : uint64 ) {. async , raises : [ Defect ] . } =
if node . wakuPeerExchange . isNil ( ) :
error " could not get peers from px, waku peer-exchange is nil "
return
info " Retrieving peer info via peer exchange protocol "
let pxPeersRes = await node . wakuPeerExchange . request ( amount )
if pxPeersRes . isOk :
var validPeers = 0
for pi in pxPeersRes . get ( ) . peerInfos :
var record : enr . Record
if enr . fromBytes ( record , pi . enr ) :
# TODO: Add source: PX
node . peerManager . addPeer ( record . toRemotePeerInfo ( ) . get , WakuRelayCodec )
validPeers + = 1
info " Retrieved peer info via peer exchange protocol " , validPeers = validPeers
else :
warn " Failed to retrieve peer info via peer exchange protocol " , error = pxPeersRes . error
2022-11-02 08:45:21 +00:00
# TODO: Move to application module (e.g., wakunode2.nim)
2022-10-27 15:29:09 +00:00
proc setPeerExchangePeer * ( node : WakuNode , peer : RemotePeerInfo | string ) {. raises : [ Defect , ValueError , LPError ] . } =
if node . wakuPeerExchange . isNil ( ) :
error " could not set peer, waku peer-exchange is nil "
return
info " Set peer-exchange peer " , peer = peer
let remotePeer = when peer is string : parseRemotePeerInfo ( peer )
else : peer
2022-11-02 08:45:21 +00:00
node . peerManager . addPeer ( remotePeer , WakuPeerExchangeCodec )
waku_px_peers . inc ( )
2022-10-27 15:29:09 +00:00
## Other protocols
2022-10-18 14:05:53 +00:00
proc mountLibp2pPing * ( node : WakuNode ) {. async , raises : [ Defect , LPError ] . } =
info " mounting libp2p ping protocol "
try :
node . libp2pPing = Ping . new ( rng = node . rng )
except Exception as e :
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module
raise newException ( LPError , " Failed to initialize ping protocol " )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
if node . started :
# Node has started already. Let's start ping too.
await node . libp2pPing . start ( )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
node . switch . mount ( node . libp2pPing )
2022-11-24 13:11:23 +00:00
# TODO: Move this logic to PeerManager
2022-10-18 14:05:53 +00:00
proc keepaliveLoop ( node : WakuNode , keepalive : chronos . Duration ) {. async . } =
while node . started :
# Keep all connected peers alive while running
trace " Running keepalive "
# First get a list of connected peer infos
2022-11-24 13:11:23 +00:00
let peers = node . peerManager . peerStore . peers ( )
. filterIt ( it . connectedness = = Connected )
2022-10-18 14:05:53 +00:00
. mapIt ( it . toRemotePeerInfo ( ) )
for peer in peers :
2023-01-23 20:24:46 +00:00
try :
let conn = await node . switch . dial ( peer . peerId , peer . addrs , PingCodec )
let pingDelay = await node . libp2pPing . ping ( conn )
except CatchableError as exc :
2022-10-18 14:05:53 +00:00
waku_node_errors . inc ( labelValues = [ " keep_alive_failure " ] )
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
await sleepAsync ( keepalive )
proc startKeepalive * ( node : WakuNode ) =
let defaultKeepalive = 2 . minutes # 20% of the default chronosstream timeout duration
info " starting keepalive " , keepalive = defaultKeepalive
asyncSpawn node . keepaliveLoop ( defaultKeepalive )
proc runDiscv5Loop ( node : WakuNode ) {. async . } =
## Continuously add newly discovered nodes
## using Node Discovery v5
if ( node . wakuDiscv5 . isNil ) :
warn " Trying to run discovery v5 while it ' s disabled "
return
info " Starting discovery loop "
while node . wakuDiscv5 . listening :
trace " Running discovery loop "
2023-01-23 20:24:46 +00:00
let discoveredPeersRes = await node . wakuDiscv5 . findRandomPeers ( )
2022-11-21 08:36:41 +00:00
2023-01-23 20:24:46 +00:00
if discoveredPeersRes . isOk :
let discoveredPeers = discoveredPeersRes . get
let newSeen = discoveredPeers . countIt ( not node . peerManager . peerStore [ AddressBook ] . contains ( it . peerId ) )
info " Discovered peers " , discovered = discoveredPeers . len , new = newSeen
2022-10-18 14:05:53 +00:00
2023-01-23 20:24:46 +00:00
# Add all peers, new ones and already seen (in case their addresses changed)
for peer in discoveredPeers :
2023-01-18 14:17:56 +00:00
# TODO: proto: WakuRelayCodec will be removed from add peer
node . peerManager . addPeer ( peer , WakuRelayCodec )
2022-10-18 14:05:53 +00:00
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
#
# Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync ( 5 . seconds )
proc startDiscv5 * ( node : WakuNode ) : Future [ bool ] {. async . } =
## Start Discovery v5 service
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
info " Starting discovery v5 service "
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
if not node . wakuDiscv5 . isNil ( ) :
2022-10-18 14:05:53 +00:00
## First start listening on configured port
try :
trace " Start listening on discv5 port "
node . wakuDiscv5 . open ( )
except CatchableError :
error " Failed to start discovery service. UDP port may be already in use "
return false
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
## Start Discovery v5
trace " Start discv5 service "
node . wakuDiscv5 . start ( )
trace " Start discovering new peers using discv5 "
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
asyncSpawn node . runDiscv5Loop ( )
debug " Successfully started discovery v5 service "
info " Discv5: discoverable ENR " , enr = node . wakuDiscV5 . protocol . localNode . record . toUri ( )
return true
return false
proc stopDiscv5 * ( node : WakuNode ) : Future [ bool ] {. async . } =
## Stop Discovery v5 service
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
if not node . wakuDiscv5 . isNil ( ) :
2022-10-18 14:05:53 +00:00
info " Stopping discovery v5 service "
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
## Stop Discovery v5 process and close listening port
if node . wakuDiscv5 . listening :
trace " Stop listening on discv5 port "
await node . wakuDiscv5 . closeWait ( )
debug " Successfully stopped discovery v5 service "
2022-10-27 15:29:09 +00:00
2022-10-18 14:05:53 +00:00
proc start * ( node : WakuNode ) {. async . } =
## Starts a created Waku Node and
## all its mounted protocols.
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
waku_version . set ( 1 , labelValues = [ git_version ] )
2022-11-18 10:59:35 +00:00
info " Starting Waku node " , version = git_version
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
let peerInfo = node . switch . peerInfo
info " PeerInfo " , peerId = peerInfo . peerId , addrs = peerInfo . addrs
var listenStr = " "
for address in node . announcedAddresses :
2022-11-21 08:36:41 +00:00
var fulladdr = " [ " & $ address & " /p2p/ " & $ peerInfo . peerId & " ] "
2022-10-18 14:05:53 +00:00
listenStr & = fulladdr
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
## XXX: this should be /ip4..., / stripped?
info " Listening on " , full = listenStr
info " DNS: discoverable ENR " , enr = node . enr . toUri ( )
# Perform relay-specific startup tasks TODO: this should be rethought
2022-10-27 15:29:09 +00:00
if not node . wakuRelay . isNil ( ) :
2022-10-18 14:05:53 +00:00
await node . startRelay ( )
2022-11-21 08:36:41 +00:00
2022-10-28 09:51:46 +00:00
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper =
proc ( listenAddrs : seq [ MultiAddress ] ) : Future [ seq [ MultiAddress ] ] {. async . } =
return node . announcedAddresses
node . switch . peerInfo . addressMappers . add ( addressMapper )
## The switch will update addresses after start using the addressMapper
await node . switch . start ( )
2022-10-18 14:05:53 +00:00
node . started = true
2022-11-21 08:36:41 +00:00
2022-10-18 14:05:53 +00:00
info " Node started successfully "
proc stop * ( node : WakuNode ) {. async . } =
2022-10-27 15:29:09 +00:00
if not node . wakuRelay . isNil ( ) :
2022-10-18 14:05:53 +00:00
await node . wakuRelay . stop ( )
2022-11-21 08:36:41 +00:00
2022-10-27 15:29:09 +00:00
if not node . wakuDiscv5 . isNil ( ) :
2022-10-18 14:05:53 +00:00
discard await node . stopDiscv5 ( )
await node . switch . stop ( )
2023-01-26 09:20:20 +00:00
node . peerManager . stop ( )
2022-10-18 14:05:53 +00:00
2022-11-21 08:36:41 +00:00
node . started = false