2021-03-11 16:27:12 -04:00
package node
import (
"context"
2023-01-13 19:58:22 -04:00
"math/rand"
2021-11-17 12:19:42 -04:00
"net"
2021-11-23 11:03:12 -04:00
"sync"
2021-03-15 19:59:18 -04:00
"time"
2021-03-11 16:27:12 -04:00
2023-05-12 17:52:42 -04:00
backoffv4 "github.com/cenkalti/backoff/v4"
2023-02-24 11:58:49 -04:00
golog "github.com/ipfs/go-log/v2"
2022-12-09 18:09:06 +00:00
"github.com/libp2p/go-libp2p"
"go.uber.org/zap"
2023-10-16 12:28:54 -04:00
"golang.org/x/exp/maps"
2022-12-09 18:09:06 +00:00
2022-06-13 14:30:35 -04:00
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
2021-08-31 14:19:49 -04:00
2022-10-19 15:39:32 -04:00
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
2023-02-16 12:17:52 -04:00
"github.com/libp2p/go-libp2p/core/protocol"
2023-05-12 17:52:42 -04:00
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
2023-06-05 10:39:38 -04:00
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
2023-05-12 17:52:42 -04:00
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
2022-05-27 15:55:35 -04:00
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
2021-03-11 16:27:12 -04:00
ma "github.com/multiformats/go-multiaddr"
2021-04-21 20:09:37 -04:00
2022-11-09 15:53:01 -04:00
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
2023-11-13 19:17:43 +07:00
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
2023-08-03 21:51:15 +05:30
"github.com/waku-org/go-waku/waku/v2/peermanager"
2023-08-10 18:28:22 +05:30
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
2023-09-14 20:30:06 +05:30
wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
2023-04-20 14:51:13 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
2022-11-09 15:53:01 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
2023-10-15 15:16:40 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
2022-11-09 15:53:01 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
2023-03-09 11:48:25 -04:00
"github.com/waku-org/go-waku/waku/v2/rendezvous"
2023-11-07 22:43:19 +05:30
"github.com/waku-org/go-waku/waku/v2/service"
2022-12-08 23:08:04 -04:00
"github.com/waku-org/go-waku/waku/v2/timesource"
2022-11-09 15:53:01 -04:00
"github.com/waku-org/go-waku/waku/v2/utils"
2021-03-11 16:27:12 -04:00
)
2023-07-07 11:51:15 -04:00
const discoveryConnectTimeout = 20 * time . Second
2021-11-10 14:36:51 +01:00
type Peer struct {
2023-12-06 07:17:59 +05:30
ID peer . ID ` json:"peerID" `
Protocols [ ] protocol . ID ` json:"protocols" `
Addrs [ ] ma . Multiaddr ` json:"addrs" `
Connected bool ` json:"connected" `
PubsubTopics [ ] string ` json:"pubsubTopics" `
2021-11-10 14:36:51 +01:00
}
2022-03-18 12:56:34 -07:00
type storeFactory func ( w * WakuNode ) store . Store
2023-04-10 11:20:07 -04:00
type byte32 = [ 32 ] byte
type IdentityCredential = struct {
IDTrapdoor byte32 ` json:"idTrapdoor" `
IDNullifier byte32 ` json:"idNullifier" `
IDSecretHash byte32 ` json:"idSecretHash" `
IDCommitment byte32 ` json:"idCommitment" `
2022-08-12 08:44:13 -04:00
}
2023-09-07 17:39:10 -04:00
type SpamHandler = func ( message * pb . WakuMessage , topic string ) error
2023-08-21 16:54:13 -04:00
2022-08-12 08:44:13 -04:00
type RLNRelay interface {
2023-04-10 11:20:07 -04:00
IdentityCredential ( ) ( IdentityCredential , error )
2023-08-24 14:42:50 -04:00
MembershipIndex ( ) uint
2022-08-12 08:44:13 -04:00
AppendRLNProof ( msg * pb . WakuMessage , senderEpochTime time . Time ) error
2023-09-07 17:39:10 -04:00
Validator ( spamHandler SpamHandler ) func ( ctx context . Context , message * pb . WakuMessage , topic string ) bool
2023-08-21 16:54:13 -04:00
Start ( ctx context . Context ) error
2023-08-18 09:59:37 -04:00
Stop ( ) error
2023-09-11 17:34:56 -04:00
IsReady ( ctx context . Context ) ( bool , error )
2022-08-12 08:44:13 -04:00
}
2021-03-11 16:27:12 -04:00
type WakuNode struct {
2022-12-08 23:08:04 -04:00
host host . Host
opts * WakuNodeParameters
log * zap . Logger
timesource timesource . Timesource
2023-08-15 21:40:00 -04:00
metrics Metrics
2021-04-14 22:19:31 -04:00
2023-07-07 08:35:22 -04:00
peerstore peerstore . Peerstore
2023-08-03 21:51:15 +05:30
peerConnector * peermanager . PeerConnectionStrategy
2023-06-05 10:39:38 -04:00
2023-04-11 10:38:16 -04:00
relay Service
lightPush Service
discoveryV5 Service
peerExchange Service
rendezvous Service
2023-10-15 15:16:40 -04:00
metadata Service
2023-08-14 23:29:00 +03:00
filterFullNode ReceptorService
filterLightNode Service
2023-04-11 10:38:16 -04:00
store ReceptorService
rlnRelay RLNRelay
2023-01-06 18:37:57 -04:00
2023-05-12 17:52:42 -04:00
wakuFlag enr . WakuEnrBitfield
circuitRelayNodes chan peer . AddrInfo
2021-06-24 09:02:53 -04:00
2022-06-13 14:30:35 -04:00
localNode * enode . LocalNode
2023-05-05 15:19:15 +05:30
bcaster relay . Broadcaster
2021-04-14 22:19:31 -04:00
2024-02-08 15:24:58 +05:30
connectionNotif ConnectionNotifier
addressChangesSub event . Subscription
enrChangeCh chan struct { }
2021-06-16 13:14:22 +03:00
2021-11-24 16:11:24 -04:00
keepAliveMutex sync . Mutex
keepAliveFails map [ peer . ID ] int
2021-04-18 19:41:42 -04:00
cancel context . CancelFunc
2021-11-23 11:03:12 -04:00
wg * sync . WaitGroup
2021-06-16 13:14:22 +03:00
2022-03-18 12:56:34 -07:00
storeFactory storeFactory
2023-08-03 21:51:15 +05:30
peermanager * peermanager . PeerManager
2022-03-18 12:56:34 -07:00
}
func defaultStoreFactory ( w * WakuNode ) store . Store {
2023-08-15 21:40:00 -04:00
return store . NewWakuStore ( w . opts . messageProvider , w . peermanager , w . timesource , w . opts . prometheusReg , w . log )
2021-03-11 16:27:12 -04:00
}
2022-04-25 23:31:26 +04:00
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
2023-01-06 18:37:57 -04:00
func New ( opts ... WakuNodeOption ) ( * WakuNode , error ) {
2023-05-15 14:44:36 -04:00
var err error
2021-04-18 19:41:42 -04:00
params := new ( WakuNodeParameters )
params . libP2POpts = DefaultLibP2POptions
2021-11-17 12:19:42 -04:00
opts = append ( DefaultWakuNodeOptions , opts ... )
2021-04-18 19:41:42 -04:00
for _ , opt := range opts {
err := opt ( params )
2021-03-11 16:27:12 -04:00
if err != nil {
return nil , err
}
}
2023-02-01 19:35:31 -04:00
if params . logger == nil {
params . logger = utils . Logger ( )
2023-06-05 10:39:38 -04:00
//golog.SetPrimaryCore(params.logger.Core())
2023-02-24 11:58:49 -04:00
golog . SetAllLoggers ( params . logLevel )
2023-02-01 19:35:31 -04:00
}
2022-06-13 14:30:35 -04:00
if params . privKey == nil {
prvKey , err := crypto . GenerateKey ( )
if err != nil {
return nil , err
}
params . privKey = prvKey
}
2022-03-22 09:12:58 -04:00
if params . enableWSS {
params . libP2POpts = append ( params . libP2POpts , libp2p . Transport ( ws . New , ws . WithTLSConfig ( params . tlsConfig ) ) )
2022-07-25 11:49:10 -04:00
} else {
// Enable WS transport by default
2022-03-22 09:12:58 -04:00
params . libP2POpts = append ( params . libP2POpts , libp2p . Transport ( ws . New ) )
}
2021-11-17 12:19:42 -04:00
// Setting default host address if none was provided
if params . hostAddr == nil {
2023-05-15 14:44:36 -04:00
params . hostAddr , err = net . ResolveTCPAddr ( "tcp" , "0.0.0.0:0" )
if err != nil {
return nil , err
}
err = WithHostAddress ( params . hostAddr ) ( params )
2021-11-17 12:19:42 -04:00
if err != nil {
return nil , err
}
}
2023-05-15 14:44:36 -04:00
2021-04-18 19:41:42 -04:00
if len ( params . multiAddr ) > 0 {
params . libP2POpts = append ( params . libP2POpts , libp2p . ListenAddrs ( params . multiAddr ... ) )
}
2021-03-11 16:27:12 -04:00
2022-06-13 14:30:35 -04:00
params . libP2POpts = append ( params . libP2POpts , params . Identity ( ) )
2021-03-15 12:07:23 -04:00
2021-10-14 22:15:02 -04:00
if params . addressFactory != nil {
params . libP2POpts = append ( params . libP2POpts , libp2p . AddrsFactory ( params . addressFactory ) )
}
2021-03-11 16:27:12 -04:00
w := new ( WakuNode )
2023-05-05 15:19:15 +05:30
w . bcaster = relay . NewBroadcaster ( 1024 )
2021-04-18 19:41:42 -04:00
w . opts = params
2022-01-18 14:17:06 -04:00
w . log = params . logger . Named ( "node2" )
2021-11-23 11:03:12 -04:00
w . wg = & sync . WaitGroup { }
2021-11-24 16:11:24 -04:00
w . keepAliveFails = make ( map [ peer . ID ] int )
2024-02-20 08:47:37 -04:00
w . wakuFlag = enr . NewWakuEnrBitfield ( w . opts . enableLightPush , w . opts . enableFilterFullNode , w . opts . enableStore , w . opts . enableRelay )
2023-05-12 17:52:42 -04:00
w . circuitRelayNodes = make ( chan peer . AddrInfo )
2023-08-15 21:40:00 -04:00
w . metrics = newMetrics ( params . prometheusReg )
w . metrics . RecordVersion ( Version , GitCommit )
2023-05-12 17:52:42 -04:00
2023-06-05 10:39:38 -04:00
// Setup peerstore wrapper
if params . peerstore != nil {
2023-08-10 18:28:22 +05:30
w . peerstore = wps . NewWakuPeerstore ( params . peerstore )
2023-06-05 10:39:38 -04:00
params . libP2POpts = append ( params . libP2POpts , libp2p . Peerstore ( w . peerstore ) )
} else {
ps , err := pstoremem . NewPeerstore ( )
if err != nil {
return nil , err
}
2023-08-10 18:28:22 +05:30
w . peerstore = wps . NewWakuPeerstore ( ps )
2023-06-05 10:39:38 -04:00
params . libP2POpts = append ( params . libP2POpts , libp2p . Peerstore ( w . peerstore ) )
}
2023-05-12 17:52:42 -04:00
// Use circuit relay with nodes received on circuitRelayNodes channel
params . libP2POpts = append ( params . libP2POpts , libp2p . EnableAutoRelayWithPeerSource (
func ( ctx context . Context , numPeers int ) <- chan peer . AddrInfo {
r := make ( chan peer . AddrInfo )
go func ( ) {
defer close ( r )
for ; numPeers != 0 ; numPeers -- {
select {
case v , ok := <- w . circuitRelayNodes :
if ! ok {
return
}
select {
case r <- v :
case <- ctx . Done ( ) :
return
}
case <- ctx . Done ( ) :
return
}
}
} ( )
return r
} ,
2023-09-20 02:54:16 -04:00
autorelay . WithMinInterval ( params . circuitRelayMinInterval ) ,
autorelay . WithBootDelay ( params . circuitRelayBootDelay ) ,
2023-05-12 17:52:42 -04:00
) )
2021-06-16 13:14:22 +03:00
2022-12-08 23:08:04 -04:00
if params . enableNTP {
w . timesource = timesource . NewNTPTimesource ( w . opts . ntpURLs , w . log )
} else {
w . timesource = timesource . NewDefaultClock ( )
}
2023-04-25 12:09:55 -04:00
w . localNode , err = enr . NewLocalnode ( w . opts . privKey )
2023-01-06 18:37:57 -04:00
if err != nil {
w . log . Error ( "creating localnode" , zap . Error ( err ) )
}
2023-08-15 06:57:51 +05:30
2024-03-14 10:21:47 -04:00
metadata := metadata . NewWakuMetadata ( w . opts . clusterID , w . localNode , w . log )
w . metadata = metadata
2023-10-15 15:16:40 -04:00
2023-08-03 21:51:15 +05:30
//Initialize peer manager.
2024-03-14 10:21:47 -04:00
w . peermanager = peermanager . NewPeerManager ( w . opts . maxPeerConnections , w . opts . peerStoreCapacity , metadata , w . log )
2023-01-06 18:37:57 -04:00
2023-08-30 21:33:57 +07:00
w . peerConnector , err = peermanager . NewPeerConnectionStrategy ( w . peermanager , discoveryConnectTimeout , w . log )
2023-01-13 19:58:22 -04:00
if err != nil {
w . log . Error ( "creating peer connection strategy" , zap . Error ( err ) )
}
2023-01-06 18:37:57 -04:00
if w . opts . enableDiscV5 {
err := w . mountDiscV5 ( )
if err != nil {
return nil , err
}
}
2023-08-15 21:40:00 -04:00
w . peerExchange , err = peer_exchange . NewWakuPeerExchange ( w . DiscV5 ( ) , w . peerConnector , w . peermanager , w . opts . prometheusReg , w . log )
2023-01-10 20:52:10 -04:00
if err != nil {
return nil , err
}
2023-01-06 18:37:57 -04:00
2023-07-27 13:04:08 -04:00
w . rendezvous = rendezvous . NewRendezvous ( w . opts . rendezvousDB , w . peerConnector , w . log )
2024-01-03 07:06:41 +05:30
w . relay = relay . NewWakuRelay ( w . bcaster , w . opts . minRelayPeersToPublish , w . timesource , w . opts . prometheusReg , w . log ,
relay . WithPubSubOptions ( w . opts . pubsubOpts ) ,
relay . WithMaxMsgSize ( w . opts . maxMsgSizeBytes ) )
2023-09-14 17:24:34 -04:00
2023-08-21 16:54:13 -04:00
if w . opts . enableRelay {
err = w . setupRLNRelay ( )
if err != nil {
return nil , err
}
}
2023-11-07 22:43:19 +05:30
w . opts . filterOpts = append ( w . opts . filterOpts , filter . WithPeerManager ( w . peermanager ) )
2023-10-16 22:12:01 +05:30
2023-08-15 21:40:00 -04:00
w . filterFullNode = filter . NewWakuFilterFullNode ( w . timesource , w . opts . prometheusReg , w . log , w . opts . filterOpts ... )
w . filterLightNode = filter . NewWakuFilterLightNode ( w . bcaster , w . peermanager , w . timesource , w . opts . prometheusReg , w . log )
w . lightPush = lightpush . NewWakuLightPush ( w . Relay ( ) , w . peermanager , w . opts . prometheusReg , w . log )
2023-01-10 08:43:26 -04:00
2022-03-18 12:56:34 -07:00
if params . storeFactory != nil {
w . storeFactory = params . storeFactory
} else {
w . storeFactory = defaultStoreFactory
}
2024-02-08 15:24:58 +05:30
if params . topicHealthNotifCh != nil {
w . peermanager . TopicHealthNotifCh = params . topicHealthNotifCh
2021-06-28 10:14:28 -04:00
}
2021-10-06 11:34:39 -04:00
2021-10-04 22:13:54 -04:00
return w , nil
}
2023-01-11 22:20:23 -04:00
func ( w * WakuNode ) watchMultiaddressChanges ( ctx context . Context ) {
2021-11-23 11:03:12 -04:00
defer w . wg . Done ( )
2023-10-16 12:28:54 -04:00
addrsSet := utils . MultiAddrSet ( w . ListenAddresses ( ) ... )
2021-11-17 12:19:42 -04:00
first := make ( chan struct { } , 1 )
first <- struct { } { }
for {
select {
2023-01-06 18:37:57 -04:00
case <- ctx . Done ( ) :
2021-11-17 12:19:42 -04:00
return
case <- first :
2023-10-16 12:28:54 -04:00
addr := maps . Keys ( addrsSet )
w . log . Info ( "listening" , logging . MultiAddrs ( "multiaddr" , addr ... ) )
2021-11-17 12:19:42 -04:00
case <- w . addressChangesSub . Out ( ) :
2023-10-16 12:28:54 -04:00
newAddrs := utils . MultiAddrSet ( w . ListenAddresses ( ) ... )
if ! maps . Equal ( addrsSet , newAddrs ) {
addrsSet = newAddrs
addrs := maps . Keys ( addrsSet )
2022-06-13 14:30:35 -04:00
w . log . Info ( "listening addresses update received" , logging . MultiAddrs ( "multiaddr" , addrs ... ) )
2023-09-08 15:19:23 -04:00
err := w . setupENR ( ctx , addrs )
if err != nil {
w . log . Warn ( "could not update ENR" , zap . Error ( err ) )
}
2021-11-17 12:19:42 -04:00
}
}
}
}
2022-04-25 23:31:26 +04:00
// Start initializes all the protocols that were setup in the WakuNode
2023-01-06 18:37:57 -04:00
func ( w * WakuNode ) Start ( ctx context . Context ) error {
2024-01-08 15:05:21 -04:00
connGater := peermanager . NewConnectionGater ( w . opts . maxConnectionsPerIP , w . log )
2023-06-05 10:39:38 -04:00
2023-01-06 18:37:57 -04:00
ctx , cancel := context . WithCancel ( ctx )
w . cancel = cancel
2022-12-08 23:08:04 -04:00
2023-07-06 17:47:25 -04:00
libP2POpts := append ( w . opts . libP2POpts , libp2p . ConnectionGater ( connGater ) )
host , err := libp2p . New ( libP2POpts ... )
2023-04-16 20:04:12 -04:00
if err != nil {
return err
}
2023-06-05 10:39:38 -04:00
host . Network ( ) . Notify ( & network . NotifyBundle {
DisconnectedF : func ( net network . Network , conn network . Conn ) {
go connGater . NotifyDisconnect ( conn . RemoteMultiaddr ( ) )
} ,
} )
2023-04-16 20:04:12 -04:00
w . host = host
if w . addressChangesSub , err = host . EventBus ( ) . Subscribe ( new ( event . EvtLocalAddressesUpdated ) ) ; err != nil {
return err
}
2023-08-15 21:40:00 -04:00
w . connectionNotif = NewConnectionNotifier ( ctx , w . host , w . opts . connNotifCh , w . metrics , w . log )
2023-01-06 18:37:57 -04:00
w . host . Network ( ) . Notify ( w . connectionNotif )
2023-02-15 14:43:51 -04:00
w . enrChangeCh = make ( chan struct { } , 10 )
2023-05-12 17:52:42 -04:00
w . wg . Add ( 4 )
2023-01-06 18:37:57 -04:00
go w . connectednessListener ( ctx )
2023-01-11 22:20:23 -04:00
go w . watchMultiaddressChanges ( ctx )
go w . watchENRChanges ( ctx )
2023-05-12 17:52:42 -04:00
go w . findRelayNodes ( ctx )
2023-01-06 18:37:57 -04:00
2023-04-16 20:04:12 -04:00
err = w . bcaster . Start ( ctx )
2023-04-14 17:50:44 -04:00
if err != nil {
return err
}
2023-01-06 18:37:57 -04:00
if w . opts . keepAliveInterval > time . Duration ( 0 ) {
w . wg . Add ( 1 )
2023-01-08 14:33:30 -04:00
go w . startKeepAlive ( ctx , w . opts . keepAliveInterval )
2022-06-19 17:47:39 -04:00
}
2021-12-06 11:49:13 +01:00
2023-10-15 15:16:40 -04:00
w . metadata . SetHost ( host )
err = w . metadata . Start ( ctx )
if err != nil {
return err
}
2023-04-16 20:04:12 -04:00
w . peerConnector . SetHost ( host )
2023-08-16 17:55:58 +05:30
w . peermanager . SetHost ( host )
2023-04-14 17:50:44 -04:00
err = w . peerConnector . Start ( ctx )
2023-01-13 19:58:22 -04:00
if err != nil {
return err
}
2023-01-06 18:37:57 -04:00
if w . opts . enableNTP {
err := w . timesource . Start ( ctx )
2022-12-08 23:08:04 -04:00
if err != nil {
return err
}
2021-04-18 19:41:42 -04:00
}
2023-08-24 10:25:17 -04:00
if w . opts . enableRLN {
err = w . startRlnRelay ( ctx )
if err != nil {
return err
}
}
2023-04-16 20:04:12 -04:00
w . relay . SetHost ( host )
2023-08-10 18:28:22 +05:30
2023-01-06 18:37:57 -04:00
if w . opts . enableRelay {
err := w . relay . Start ( ctx )
2021-12-08 13:00:20 +00:00
if err != nil {
return err
}
2023-09-19 11:35:29 +05:30
err = w . peermanager . SubscribeToRelayEvtBus ( w . relay . ( * relay . WakuRelay ) . Events ( ) )
if err != nil {
return err
}
2023-08-03 21:51:15 +05:30
w . peermanager . Start ( ctx )
2023-08-20 18:06:35 +05:30
w . registerAndMonitorReachability ( ctx )
2021-11-17 12:19:42 -04:00
}
2023-01-06 18:37:57 -04:00
w . store = w . storeFactory ( w )
2023-04-16 20:04:12 -04:00
w . store . SetHost ( host )
2023-01-06 18:37:57 -04:00
if w . opts . enableStore {
2023-05-05 15:19:15 +05:30
sub := w . bcaster . RegisterForAll ( )
err := w . startStore ( ctx , sub )
2022-10-23 09:13:43 -04:00
if err != nil {
return err
}
2023-01-06 18:37:57 -04:00
w . log . Info ( "Subscribing store to broadcaster" )
2021-11-17 12:19:42 -04:00
}
2023-04-16 20:04:12 -04:00
w . lightPush . SetHost ( host )
2023-01-06 18:37:57 -04:00
if w . opts . enableLightPush {
if err := w . lightPush . Start ( ctx ) ; err != nil {
2022-12-08 23:08:04 -04:00
return err
}
2021-06-16 13:14:22 +03:00
}
2023-08-14 23:29:00 +03:00
w . filterFullNode . SetHost ( host )
2023-04-16 20:04:12 -04:00
if w . opts . enableFilterFullNode {
2023-05-05 15:19:15 +05:30
sub := w . bcaster . RegisterForAll ( )
2023-08-14 23:29:00 +03:00
err := w . filterFullNode . Start ( ctx , sub )
2023-02-07 18:28:46 -04:00
if err != nil {
return err
}
w . log . Info ( "Subscribing filterV2 to broadcaster" )
2023-05-05 15:19:15 +05:30
2023-02-08 19:33:06 -04:00
}
2023-08-14 23:29:00 +03:00
w . filterLightNode . SetHost ( host )
2023-04-11 10:38:16 -04:00
if w . opts . enableFilterLightNode {
2023-08-14 23:29:00 +03:00
err := w . filterLightNode . Start ( ctx )
2023-02-08 19:33:06 -04:00
if err != nil {
return err
}
2023-02-07 18:28:46 -04:00
}
2023-01-13 19:58:22 -04:00
err = w . setupENR ( ctx , w . ListenAddresses ( ) )
2023-01-06 18:37:57 -04:00
if err != nil {
return err
}
2023-04-16 20:04:12 -04:00
w . peerExchange . SetHost ( host )
2023-01-06 18:37:57 -04:00
if w . opts . enablePeerExchange {
err := w . peerExchange . Start ( ctx )
if err != nil {
2021-11-01 08:38:03 -04:00
return err
}
2021-04-28 16:23:03 -04:00
}
2023-04-16 20:04:12 -04:00
w . rendezvous . SetHost ( host )
2023-07-31 14:58:50 -04:00
if w . opts . enableRendezvousPoint {
2023-03-09 11:48:25 -04:00
err := w . rendezvous . Start ( ctx )
if err != nil {
return err
}
}
2021-10-04 22:13:54 -04:00
return nil
2021-03-11 16:27:12 -04:00
}
2022-04-25 23:31:26 +04:00
// Stop stops the WakuNode and closess all connections to the host
2021-03-22 12:45:13 -04:00
func ( w * WakuNode ) Stop ( ) {
2023-01-06 18:37:57 -04:00
if w . cancel == nil {
return
}
2023-04-14 17:50:44 -04:00
w . bcaster . Stop ( )
2021-11-01 10:42:55 -04:00
2021-10-06 11:34:39 -04:00
defer w . connectionNotif . Close ( )
2021-11-17 12:19:42 -04:00
defer w . addressChangesSub . Close ( )
2021-06-24 09:02:53 -04:00
2023-10-15 15:16:40 -04:00
w . host . Network ( ) . StopNotify ( w . connectionNotif )
2023-01-06 18:37:57 -04:00
w . relay . Stop ( )
w . lightPush . Stop ( )
w . store . Stop ( )
2023-08-14 23:29:00 +03:00
w . filterFullNode . Stop ( )
2023-12-21 08:44:51 +08:00
w . filterLightNode . Stop ( )
2022-10-23 09:13:43 -04:00
2023-01-06 18:37:57 -04:00
if w . opts . enableDiscV5 {
2022-10-23 09:13:43 -04:00
w . discoveryV5 . Stop ( )
}
2023-05-18 23:22:19 +08:00
w . peerExchange . Stop ( )
2023-07-27 13:04:08 -04:00
w . rendezvous . Stop ( )
2022-10-23 09:13:43 -04:00
2023-01-13 19:58:22 -04:00
w . peerConnector . Stop ( )
2022-09-12 10:13:38 -04:00
_ = w . stopRlnRelay ( )
2021-10-01 14:37:52 -04:00
2023-01-06 18:37:57 -04:00
w . timesource . Stop ( )
2022-12-08 23:08:04 -04:00
2021-10-01 14:37:52 -04:00
w . host . Close ( )
2021-11-23 11:03:12 -04:00
2023-05-09 16:35:04 -04:00
w . cancel ( )
2021-11-23 11:03:12 -04:00
w . wg . Wait ( )
2023-02-09 09:03:05 -04:00
close ( w . enrChangeCh )
2023-04-14 17:50:44 -04:00
w . cancel = nil
2021-03-15 12:07:23 -04:00
}
2022-04-25 23:31:26 +04:00
// Host returns the libp2p Host used by the WakuNode
2021-03-15 12:07:23 -04:00
func ( w * WakuNode ) Host ( ) host . Host {
return w . host
}
2022-04-25 23:31:26 +04:00
// ID returns the base58 encoded ID from the host
2021-03-18 19:21:45 -04:00
func ( w * WakuNode ) ID ( ) string {
2024-01-12 23:10:27 +05:30
return w . host . ID ( ) . String ( )
2021-03-18 19:21:45 -04:00
}
2023-01-11 22:20:23 -04:00
func ( w * WakuNode ) watchENRChanges ( ctx context . Context ) {
defer w . wg . Done ( )
var prevNodeVal string
for {
select {
case <- ctx . Done ( ) :
return
2023-02-07 09:45:06 -04:00
case <- w . enrChangeCh :
2023-01-11 22:20:23 -04:00
if w . localNode != nil {
currNodeVal := w . localNode . Node ( ) . String ( )
if prevNodeVal != currNodeVal {
if prevNodeVal == "" {
w . log . Info ( "enr record" , logging . ENode ( "enr" , w . localNode . Node ( ) ) )
} else {
w . log . Info ( "new enr record" , logging . ENode ( "enr" , w . localNode . Node ( ) ) )
}
prevNodeVal = currNodeVal
}
}
}
}
}
2022-04-25 23:31:26 +04:00
// ListenAddresses returns all the multiaddresses used by the host
2021-09-30 19:03:19 -04:00
func ( w * WakuNode ) ListenAddresses ( ) [ ] ma . Multiaddr {
2023-07-07 11:51:15 -04:00
return utils . EncapsulatePeerID ( w . host . ID ( ) , w . host . Addrs ( ) ... )
2021-04-04 13:05:33 -04:00
}
2022-06-13 14:30:35 -04:00
// ENR returns the ENR address of the node
func ( w * WakuNode ) ENR ( ) * enode . Node {
return w . localNode . Node ( )
}
2022-12-08 23:08:04 -04:00
// Timesource returns the timesource used by this node to obtain the current wall time
// Depending on the configuration it will be the local time or a ntp syncd time
func ( w * WakuNode ) Timesource ( ) timesource . Timesource {
return w . timesource
}
2022-04-25 23:31:26 +04:00
// Relay is used to access any operation related to Waku Relay protocol
2021-04-28 16:10:44 -04:00
func ( w * WakuNode ) Relay ( ) * relay . WakuRelay {
2023-01-06 18:37:57 -04:00
if result , ok := w . relay . ( * relay . WakuRelay ) ; ok {
return result
}
return nil
2021-03-15 12:07:23 -04:00
}
2022-04-25 23:31:26 +04:00
// Store is used to access any operation related to Waku Store protocol
2022-03-18 12:56:34 -07:00
func ( w * WakuNode ) Store ( ) store . Store {
2023-01-06 18:37:57 -04:00
return w . store . ( store . Store )
2021-11-01 08:38:03 -04:00
}
2023-04-11 10:38:16 -04:00
// FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature
2023-08-14 23:29:00 +03:00
func ( w * WakuNode ) FilterFullNode ( ) * filter . WakuFilterFullNode {
if result , ok := w . filterFullNode . ( * filter . WakuFilterFullNode ) ; ok {
2023-04-11 10:38:16 -04:00
return result
}
return nil
}
2023-08-14 23:29:00 +03:00
// FilterFullNode is used to access any operation related to Waku Filter protocol Light node feature
func ( w * WakuNode ) FilterLightnode ( ) * filter . WakuFilterLightNode {
if result , ok := w . filterLightNode . ( * filter . WakuFilterLightNode ) ; ok {
2023-02-08 19:33:06 -04:00
return result
}
return nil
}
2023-10-27 06:21:50 +07:00
// PeerManager for getting peer filterv2 protocol
func ( w * WakuNode ) PeerManager ( ) * peermanager . PeerManager {
return w . peermanager
}
2022-04-25 23:31:26 +04:00
// Lightpush is used to access any operation related to Waku Lightpush protocol
2021-11-01 08:38:03 -04:00
func ( w * WakuNode ) Lightpush ( ) * lightpush . WakuLightPush {
2023-01-06 18:37:57 -04:00
if result , ok := w . lightPush . ( * lightpush . WakuLightPush ) ; ok {
return result
}
return nil
2021-11-01 08:38:03 -04:00
}
2022-04-25 23:31:26 +04:00
// DiscV5 is used to access any operation related to DiscoveryV5
2021-11-17 12:19:42 -04:00
func ( w * WakuNode ) DiscV5 ( ) * discv5 . DiscoveryV5 {
2023-01-06 18:37:57 -04:00
if result , ok := w . discoveryV5 . ( * discv5 . DiscoveryV5 ) ; ok {
return result
}
return nil
2021-11-17 12:19:42 -04:00
}
2022-10-23 09:13:43 -04:00
// PeerExchange is used to access any operation related to Peer Exchange
func ( w * WakuNode ) PeerExchange ( ) * peer_exchange . WakuPeerExchange {
2023-01-06 18:37:57 -04:00
if result , ok := w . peerExchange . ( * peer_exchange . WakuPeerExchange ) ; ok {
return result
}
return nil
2022-10-23 09:13:43 -04:00
}
2023-06-23 11:50:32 -04:00
// Rendezvous is used to access any operation related to Rendezvous
func ( w * WakuNode ) Rendezvous ( ) * rendezvous . Rendezvous {
if result , ok := w . rendezvous . ( * rendezvous . Rendezvous ) ; ok {
return result
}
return nil
}
2022-04-25 23:31:26 +04:00
// Broadcaster is used to access the message broadcaster that is used to push
// messages to different protocols
2023-05-05 15:19:15 +05:30
func ( w * WakuNode ) Broadcaster ( ) relay . Broadcaster {
2021-11-18 15:20:58 +01:00
return w . bcaster
}
2021-11-17 12:19:42 -04:00
func ( w * WakuNode ) mountDiscV5 ( ) error {
discV5Options := [ ] discv5 . DiscoveryV5Option {
discv5 . WithBootnodes ( w . opts . discV5bootnodes ) ,
discv5 . WithUDPPort ( w . opts . udpPort ) ,
discv5 . WithAutoUpdate ( w . opts . discV5autoUpdate ) ,
}
2023-02-08 12:02:06 -04:00
if w . opts . advertiseAddrs != nil {
discV5Options = append ( discV5Options , discv5 . WithAdvertiseAddr ( w . opts . advertiseAddrs ) )
2022-03-10 18:14:50 -04:00
}
2022-03-29 20:02:33 -04:00
var err error
2023-11-07 22:43:19 +05:30
discv5Inst , err := discv5 . NewDiscoveryV5 ( w . opts . privKey , w . localNode , w . peerConnector , w . opts . prometheusReg , w . log , discV5Options ... )
w . discoveryV5 = discv5Inst
w . peermanager . SetDiscv5 ( discv5Inst )
2021-11-17 12:19:42 -04:00
2021-12-08 14:21:30 +00:00
return err
2021-11-17 12:19:42 -04:00
}
2023-10-21 01:26:18 +05:30
func ( w * WakuNode ) startStore ( ctx context . Context , sub * relay . Subscription ) error {
2023-05-05 15:19:15 +05:30
err := w . store . Start ( ctx , sub )
2022-12-08 23:08:04 -04:00
if err != nil {
w . log . Error ( "starting store" , zap . Error ( err ) )
return err
}
2021-09-06 09:10:19 -04:00
2022-12-08 23:08:04 -04:00
return nil
2021-03-18 19:21:45 -04:00
}
2023-08-10 18:28:22 +05:30
// AddPeer is used to add a peer and the protocols it support to the node peerstore
2023-09-14 20:30:06 +05:30
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func ( w * WakuNode ) AddPeer ( address ma . Multiaddr , origin wps . Origin , pubSubTopics [ ] string , protocols ... protocol . ID ) ( peer . ID , error ) {
2023-11-14 04:22:46 +05:30
pData , err := w . peermanager . AddPeer ( address , origin , pubSubTopics , protocols ... )
if err != nil {
return "" , err
}
return pData . AddrInfo . ID , nil
2021-08-31 14:19:49 -04:00
}
2021-06-10 15:59:51 +03:00
2023-08-10 18:28:22 +05:30
// AddDiscoveredPeer to add a discovered peer to the node peerStore
2023-09-27 12:16:37 +05:30
func ( w * WakuNode ) AddDiscoveredPeer ( ID peer . ID , addrs [ ] ma . Multiaddr , origin wps . Origin , pubsubTopics [ ] string , connectNow bool ) {
2023-11-07 22:43:19 +05:30
p := service . PeerData {
2023-08-10 18:28:22 +05:30
Origin : origin ,
AddrInfo : peer . AddrInfo {
ID : ID ,
Addrs : addrs ,
} ,
2023-11-14 04:22:46 +05:30
PubsubTopics : pubsubTopics ,
2021-09-30 12:01:53 -04:00
}
2023-09-27 12:16:37 +05:30
w . peermanager . AddDiscoveredPeer ( p , connectNow )
2021-06-28 10:14:28 -04:00
}
2022-04-25 23:31:26 +04:00
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress
2021-10-01 06:32:15 -04:00
func ( w * WakuNode ) DialPeerWithMultiAddress ( ctx context . Context , address ma . Multiaddr ) error {
2021-09-30 19:03:19 -04:00
info , err := peer . AddrInfoFromP2pAddr ( address )
if err != nil {
return err
}
2021-10-01 06:32:15 -04:00
return w . connect ( ctx , * info )
2021-09-30 19:03:19 -04:00
}
2022-04-25 23:31:26 +04:00
// DialPeer is used to connect to a peer using a string containing a multiaddress
2021-10-01 06:32:15 -04:00
func ( w * WakuNode ) DialPeer ( ctx context . Context , address string ) error {
2021-03-18 19:21:45 -04:00
p , err := ma . NewMultiaddr ( address )
if err != nil {
return err
}
info , err := peer . AddrInfoFromP2pAddr ( p )
if err != nil {
return err
}
2021-10-01 06:32:15 -04:00
return w . connect ( ctx , * info )
2021-09-06 09:10:19 -04:00
}
2023-05-10 10:13:10 -04:00
// DialPeerWithInfo is used to connect to a peer using its address information
func ( w * WakuNode ) DialPeerWithInfo ( ctx context . Context , peerInfo peer . AddrInfo ) error {
return w . connect ( ctx , peerInfo )
}
2021-10-01 06:32:15 -04:00
func ( w * WakuNode ) connect ( ctx context . Context , info peer . AddrInfo ) error {
err := w . host . Connect ( ctx , info )
2021-09-06 09:10:19 -04:00
if err != nil {
2023-08-10 18:28:22 +05:30
w . host . Peerstore ( ) . ( wps . WakuPeerstore ) . AddConnFailure ( info )
2021-09-06 09:10:19 -04:00
return err
}
2021-10-16 17:50:49 -04:00
2023-09-01 16:42:46 -04:00
for _ , addr := range info . Addrs {
// TODO: this is a temporary fix
// host.Connect adds the addresses with a TempAddressTTL
// however, identify will filter out all non IP addresses
// and expire all temporary addrs. So in the meantime, let's
2023-09-20 02:54:16 -04:00
// store dns4 addresses with a RecentlyConnectedAddrTTL, otherwise
2023-09-01 16:42:46 -04:00
// it will have trouble with the status fleet circuit relay addresses
// See https://github.com/libp2p/go-libp2p/issues/2550
_ , err := addr . ValueForProtocol ( ma . P_DNS4 )
if err == nil {
2023-09-20 02:54:16 -04:00
w . host . Peerstore ( ) . AddAddrs ( info . ID , info . Addrs , peerstore . RecentlyConnectedAddrTTL )
2023-09-01 16:42:46 -04:00
}
}
2023-08-10 18:28:22 +05:30
w . host . Peerstore ( ) . ( wps . WakuPeerstore ) . ResetConnFailures ( info )
2023-08-15 21:40:00 -04:00
w . metrics . RecordDial ( )
2021-09-06 09:10:19 -04:00
return nil
2021-03-18 19:21:45 -04:00
}
2021-03-24 16:39:12 -04:00
2022-04-25 23:31:26 +04:00
// DialPeerByID is used to connect to an already known peer
2021-10-01 06:32:15 -04:00
func ( w * WakuNode ) DialPeerByID ( ctx context . Context , peerID peer . ID ) error {
2021-08-31 15:17:56 -04:00
info := w . host . Peerstore ( ) . PeerInfo ( peerID )
2021-10-01 06:32:15 -04:00
return w . connect ( ctx , info )
2021-08-31 15:17:56 -04:00
}
2022-04-25 23:31:26 +04:00
// ClosePeerByAddress is used to disconnect from a peer using its multiaddress
2021-03-24 16:39:12 -04:00
func ( w * WakuNode ) ClosePeerByAddress ( address string ) error {
p , err := ma . NewMultiaddr ( address )
if err != nil {
return err
}
// Extract the peer ID from the multiaddr.
info , err := peer . AddrInfoFromP2pAddr ( p )
if err != nil {
return err
}
return w . ClosePeerById ( info . ID )
}
2022-04-25 23:31:26 +04:00
// ClosePeerById is used to close a connection to a peer
2021-03-24 16:39:12 -04:00
func ( w * WakuNode ) ClosePeerById ( id peer . ID ) error {
2021-09-06 09:10:19 -04:00
err := w . host . Network ( ) . ClosePeer ( id )
if err != nil {
return err
}
2021-09-06 09:34:58 -04:00
return nil
2021-03-24 16:39:12 -04:00
}
2022-04-25 23:31:26 +04:00
// PeerCount return the number of connected peers
2021-03-24 16:39:12 -04:00
func ( w * WakuNode ) PeerCount ( ) int {
2021-10-06 11:34:39 -04:00
return len ( w . host . Network ( ) . Peers ( ) )
2021-03-24 16:39:12 -04:00
}
2021-06-24 09:02:53 -04:00
2022-04-25 23:31:26 +04:00
// PeerStats returns a list of peers and the protocols supported by them
2021-11-10 14:36:51 +01:00
func ( w * WakuNode ) PeerStats ( ) PeerStats {
2021-08-31 16:51:22 -04:00
p := make ( PeerStats )
2021-10-06 11:34:39 -04:00
for _ , peerID := range w . host . Network ( ) . Peers ( ) {
protocols , err := w . host . Peerstore ( ) . GetProtocols ( peerID )
if err != nil {
continue
}
p [ peerID ] = protocols
2021-08-31 16:51:22 -04:00
}
return p
}
2022-12-09 18:09:06 +00:00
// Set the bootnodes on discv5
func ( w * WakuNode ) SetDiscV5Bootnodes ( nodes [ ] * enode . Node ) error {
w . opts . discV5bootnodes = nodes
2023-01-06 18:37:57 -04:00
return w . DiscV5 ( ) . SetBootnodes ( nodes )
2022-12-09 18:09:06 +00:00
}
2022-04-25 23:31:26 +04:00
// Peers return the list of peers, addresses, protocols supported and connection status
2021-11-10 14:36:51 +01:00
func ( w * WakuNode ) Peers ( ) ( [ ] * Peer , error ) {
var peers [ ] * Peer
for _ , peerId := range w . host . Peerstore ( ) . Peers ( ) {
connected := w . host . Network ( ) . Connectedness ( peerId ) == network . Connected
protocols , err := w . host . Peerstore ( ) . GetProtocols ( peerId )
if err != nil {
return nil , err
}
2023-07-07 11:51:15 -04:00
addrs := utils . EncapsulatePeerID ( peerId , w . host . Peerstore ( ) . Addrs ( peerId ) ... )
2023-12-06 07:17:59 +05:30
topics , err := w . host . Peerstore ( ) . ( * wps . WakuPeerstoreImpl ) . PubSubTopics ( peerId )
if err != nil {
return nil , err
}
2021-11-10 14:36:51 +01:00
peers = append ( peers , & Peer {
2023-12-06 07:17:59 +05:30
ID : peerId ,
Protocols : protocols ,
Connected : connected ,
Addrs : addrs ,
PubsubTopics : topics ,
2021-11-10 14:36:51 +01:00
} )
}
return peers , nil
}
2023-05-12 17:52:42 -04:00
2023-09-29 10:43:25 +05:30
// PeersByShard filters peers based on shard information following static sharding
func ( w * WakuNode ) PeersByStaticShard ( cluster uint16 , shard uint16 ) peer . IDSlice {
2023-09-14 20:30:06 +05:30
pTopic := wakuprotocol . NewStaticShardingPubsubTopic ( cluster , shard ) . String ( )
return w . peerstore . ( wps . WakuPeerstore ) . PeersByPubSubTopic ( pTopic )
}
2023-09-29 10:43:25 +05:30
// PeersByContentTopics filters peers based on contentTopic
func ( w * WakuNode ) PeersByContentTopic ( contentTopic string ) peer . IDSlice {
pTopic , err := wakuprotocol . GetPubSubTopicFromContentTopic ( contentTopic )
if err != nil {
return nil
}
return w . peerstore . ( wps . WakuPeerstore ) . PeersByPubSubTopic ( pTopic )
}
2023-05-12 17:52:42 -04:00
func ( w * WakuNode ) findRelayNodes ( ctx context . Context ) {
defer w . wg . Done ( )
// Feed peers more often right after the bootstrap, then backoff
bo := backoffv4 . NewExponentialBackOff ( )
bo . InitialInterval = 15 * time . Second
bo . Multiplier = 3
bo . MaxInterval = 1 * time . Hour
bo . MaxElapsedTime = 0 // never stop
t := backoffv4 . NewTicker ( bo )
defer t . Stop ( )
for {
select {
case <- t . C :
case <- ctx . Done ( ) :
return
}
peers , err := w . Peers ( )
if err != nil {
w . log . Error ( "failed to fetch peers" , zap . Error ( err ) )
continue
}
// Shuffle peers
rand . Shuffle ( len ( peers ) , func ( i , j int ) { peers [ i ] , peers [ j ] = peers [ j ] , peers [ i ] } )
for _ , p := range peers {
info := w . Host ( ) . Peerstore ( ) . PeerInfo ( p . ID )
supportedProtocols , err := w . Host ( ) . Peerstore ( ) . SupportsProtocols ( p . ID , proto . ProtoIDv2Hop )
if err != nil {
w . log . Error ( "could not check supported protocols" , zap . Error ( err ) )
continue
}
if len ( supportedProtocols ) == 0 {
continue
}
select {
case <- ctx . Done ( ) :
w . log . Debug ( "context done, auto-relay has enough peers" )
return
case w . circuitRelayNodes <- info :
w . log . Debug ( "published auto-relay peer info" , zap . Any ( "peer-id" , p . ID ) )
}
}
}
}
2023-11-13 19:17:43 +07:00
func GetNodesFromDNSDiscovery ( logger * zap . Logger , ctx context . Context , nameServer string , discoveryURLs [ ] string ) [ ] dnsdisc . DiscoveredNode {
var discoveredNodes [ ] dnsdisc . DiscoveredNode
for _ , url := range discoveryURLs {
logger . Info ( "attempting DNS discovery with " , zap . String ( "URL" , url ) )
nodes , err := dnsdisc . RetrieveNodes ( ctx , url , dnsdisc . WithNameserver ( nameServer ) )
if err != nil {
logger . Warn ( "dns discovery error " , zap . Error ( err ) )
} else {
var discPeerInfo [ ] peer . AddrInfo
for _ , n := range nodes {
discPeerInfo = append ( discPeerInfo , n . PeerInfo )
}
logger . Info ( "found dns entries " , zap . Any ( "nodes" , discPeerInfo ) )
discoveredNodes = append ( discoveredNodes , nodes ... )
}
}
return discoveredNodes
}
func GetDiscv5Option ( dnsDiscoveredNodes [ ] dnsdisc . DiscoveredNode , discv5Nodes [ ] string , port uint , autoUpdate bool ) ( WakuNodeOption , error ) {
var bootnodes [ ] * enode . Node
for _ , addr := range discv5Nodes {
bootnode , err := enode . Parse ( enode . ValidSchemes , addr )
if err != nil {
return nil , err
}
bootnodes = append ( bootnodes , bootnode )
}
for _ , n := range dnsDiscoveredNodes {
if n . ENR != nil {
bootnodes = append ( bootnodes , n . ENR )
}
}
return WithDiscoveryV5 ( port , bootnodes , autoUpdate ) , nil
}
2024-01-02 18:04:43 +05:30
func ( w * WakuNode ) ClusterID ( ) uint16 {
return w . opts . clusterID
}