2021-06-16 20:19:45 +00:00
// Copyright 2019 The Waku Library Authors.
//
// The Waku library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Waku library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty off
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Waku library. If not, see <http://www.gnu.org/licenses/>.
//
// This software uses the go-ethereum library, which is licensed
// under the GNU Lesser General Public Library, version 3 or any later.
package wakuv2
import (
2024-06-11 07:45:01 +00:00
"bytes"
2021-06-16 20:19:45 +00:00
"context"
"crypto/ecdsa"
"crypto/sha256"
2021-10-12 12:39:28 +00:00
"database/sql"
2021-06-16 20:19:45 +00:00
"errors"
"fmt"
2022-12-09 16:16:21 +00:00
"math"
2021-06-16 20:19:45 +00:00
"net"
"runtime"
2021-11-09 12:22:34 +00:00
"strings"
2021-06-16 20:19:45 +00:00
"sync"
"time"
2024-04-17 12:19:03 +00:00
"github.com/jellydator/ttlcache/v3"
2022-11-04 13:56:45 +00:00
"github.com/libp2p/go-libp2p/core/peer"
2024-05-01 00:40:13 +00:00
"github.com/libp2p/go-libp2p/core/peerstore"
2021-10-06 16:08:54 +00:00
"github.com/multiformats/go-multiaddr"
2021-06-16 20:19:45 +00:00
"go.uber.org/zap"
mapset "github.com/deckarep/golang-set"
"golang.org/x/crypto/pbkdf2"
gethcommon "github.com/ethereum/go-ethereum/common"
2021-08-30 14:57:28 +00:00
"github.com/ethereum/go-ethereum/common/hexutil"
2021-06-16 20:19:45 +00:00
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
2021-11-22 13:40:14 +00:00
"github.com/ethereum/go-ethereum/p2p/enode"
2021-06-16 20:19:45 +00:00
"github.com/ethereum/go-ethereum/rpc"
2021-08-30 21:35:37 +00:00
"github.com/libp2p/go-libp2p"
2021-10-19 13:43:41 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2022-11-04 13:56:45 +00:00
"github.com/libp2p/go-libp2p/core/metrics"
2021-10-19 13:43:41 +00:00
2022-11-22 22:05:54 +00:00
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
2024-06-28 13:54:48 +00:00
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
2024-05-15 23:15:00 +00:00
"github.com/waku-org/go-waku/waku/v2/peermanager"
2023-08-22 10:32:01 +00:00
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
2022-11-22 22:05:54 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol"
2024-01-31 18:45:24 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
2024-05-15 23:15:00 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
2023-09-13 10:50:23 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
2022-11-29 12:43:11 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
2022-11-22 22:05:54 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
2024-06-11 07:45:01 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/store"
2021-06-16 20:19:45 +00:00
2022-12-09 16:16:21 +00:00
"github.com/status-im/status-go/connection"
2021-06-16 20:19:45 +00:00
"github.com/status-im/status-go/eth-node/types"
2024-05-21 17:29:29 +00:00
"github.com/status-im/status-go/logutils"
2022-12-06 15:24:01 +00:00
"github.com/status-im/status-go/timesource"
2021-06-16 20:19:45 +00:00
"github.com/status-im/status-go/wakuv2/common"
2021-10-12 12:39:28 +00:00
"github.com/status-im/status-go/wakuv2/persistence"
2021-06-16 20:19:45 +00:00
2022-11-22 22:05:54 +00:00
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
2021-06-16 20:19:45 +00:00
)
const messageQueueLimit = 1024
2022-11-04 13:56:45 +00:00
const requestTimeout = 30 * time . Second
2022-12-09 16:16:21 +00:00
const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7
2024-04-17 12:19:03 +00:00
const cacheTTL = 20 * time . Minute
2024-06-11 07:45:01 +00:00
const maxHashQueryLength = 100
2024-06-27 01:54:31 +00:00
const hashQueryInterval = 3 * time . Second
const messageSentPeriod = 3 // in seconds
const messageExpiredPerid = 10 // in seconds
2024-06-28 08:26:56 +00:00
const maxRelayPeers = 300
2024-07-11 18:36:34 +00:00
const randomPeersKeepAliveInterval = 5 * time . Second
const allPeersKeepAliveInterval = 5 * time . Minute
2022-03-17 18:06:02 +00:00
2024-06-13 22:31:09 +00:00
type SentEnvelope struct {
2024-06-28 13:54:48 +00:00
Envelope * protocol . Envelope
2024-06-13 22:31:09 +00:00
PublishMethod PublishMethod
}
2024-06-28 10:24:04 +00:00
type ErrorSendingEnvelope struct {
Error error
SentEnvelope SentEnvelope
}
2023-10-30 14:51:57 +00:00
type ITelemetryClient interface {
2024-06-28 13:54:48 +00:00
PushReceivedEnvelope ( receivedEnvelope * protocol . Envelope )
2024-06-13 22:31:09 +00:00
PushSentEnvelope ( sentEnvelope SentEnvelope )
2024-06-28 10:24:04 +00:00
PushErrorSendingEnvelope ( errorSendingEnvelope ErrorSendingEnvelope )
2023-10-30 14:51:57 +00:00
}
2021-06-16 20:19:45 +00:00
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
2024-05-27 13:43:47 +00:00
node * node . WakuNode // reference to a libp2p waku node
appDB * sql . DB
2021-06-16 20:19:45 +00:00
2022-09-15 14:32:54 +00:00
dnsAddressCache map [ string ] [ ] dnsdisc . DiscoveredNode // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock * sync . RWMutex // lock to handle access to the map
2021-11-09 12:22:34 +00:00
2023-06-07 09:02:19 +00:00
// Filter-related
2023-09-13 10:50:23 +00:00
filters * common . Filters // Message filters installed with Subscribe function
filterManager * FilterManager
2021-06-16 20:19:45 +00:00
privateKeys map [ string ] * ecdsa . PrivateKey // Private key storage
symKeys map [ string ] [ ] byte // Symmetric key storage
keyMu sync . RWMutex // Mutex associated with key stores
2024-04-17 12:19:03 +00:00
envelopeCache * ttlcache . Cache [ gethcommon . Hash , * common . ReceivedMessage ] // Pool of envelopes currently tracked by this node
expirations map [ uint32 ] mapset . Set // Message expiration pool
poolMu sync . RWMutex // Mutex to sync the message and expiration pools
2021-06-16 20:19:45 +00:00
2021-08-30 21:35:37 +00:00
bandwidthCounter * metrics . BandwidthCounter
2021-08-03 19:27:15 +00:00
2023-05-22 21:38:02 +00:00
protectedTopicStore * persistence . ProtectedTopicsStore
sendQueue chan * protocol . Envelope
msgQueue chan * common . ReceivedMessage // Message queue for waku messages that havent been decoded
2023-07-17 17:20:55 +00:00
2024-06-26 18:25:44 +00:00
topicInterest map [ string ] TopicInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
topicInterestMu sync . Mutex
2023-07-17 17:20:55 +00:00
ctx context . Context
cancel context . CancelFunc
wg sync . WaitGroup
2021-06-16 20:19:45 +00:00
2024-02-27 09:24:34 +00:00
cfg * Config
options [ ] node . WakuNodeOption
2021-06-16 20:19:45 +00:00
envelopeFeed event . Feed
2021-12-01 15:15:18 +00:00
storeMsgIDs map [ gethcommon . Hash ] bool // Map of the currently processing ids
storeMsgIDsMu sync . RWMutex
2024-06-11 07:45:01 +00:00
sendMsgIDs map [ string ] map [ gethcommon . Hash ] uint32
sendMsgIDsMu sync . RWMutex
storePeerID peer . ID
2024-05-15 23:15:00 +00:00
topicHealthStatusChan chan peermanager . TopicHealthStatus
2024-07-11 18:36:34 +00:00
connectionNotifChan chan node . PeerConnection
2022-01-12 16:02:01 +00:00
connStatusSubscriptions map [ string ] * types . ConnStatusSubscription
connStatusMu sync . Mutex
2024-06-28 13:54:48 +00:00
onlineChecker * onlinechecker . DefaultOnlineChecker
2022-01-12 16:02:01 +00:00
2021-06-16 20:19:45 +00:00
logger * zap . Logger
2022-12-06 15:24:01 +00:00
// NTP Synced timesource
timesource * timesource . NTPTimeSource
2022-12-09 16:16:21 +00:00
// seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery
// bootnodes successfully
seededBootnodesForDiscV5 bool
2024-07-11 18:36:34 +00:00
// goingOnline is channel that notifies when connectivity has changed from offline to online
goingOnline chan struct { }
2022-12-09 16:16:21 +00:00
// discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery
discV5BootstrapNodes [ ] string
2023-07-05 15:56:34 +00:00
onHistoricMessagesRequestFailed func ( [ ] byte , peer . ID , error )
onPeerStats func ( types . ConnStatus )
2023-10-30 14:51:57 +00:00
statusTelemetryClient ITelemetryClient
2024-06-28 08:26:56 +00:00
defaultShardInfo protocol . RelayShards
2023-10-30 14:51:57 +00:00
}
func ( w * Waku ) SetStatusTelemetryClient ( client ITelemetryClient ) {
w . statusTelemetryClient = client
2021-06-16 20:19:45 +00:00
}
2024-04-17 12:19:03 +00:00
func newTTLCache ( ) * ttlcache . Cache [ gethcommon . Hash , * common . ReceivedMessage ] {
cache := ttlcache . New [ gethcommon . Hash , * common . ReceivedMessage ] ( ttlcache . WithTTL [ gethcommon . Hash , * common . ReceivedMessage ] ( cacheTTL ) )
go cache . Start ( )
return cache
}
2021-06-16 20:19:45 +00:00
// New creates a WakuV2 client ready to communicate through the LibP2P network.
2024-05-15 23:15:22 +00:00
func New ( nodeKey * ecdsa . PrivateKey , fleet string , cfg * Config , logger * zap . Logger , appDB * sql . DB , ts * timesource . NTPTimeSource , onHistoricMessagesRequestFailed func ( [ ] byte , peer . ID , error ) , onPeerStats func ( types . ConnStatus ) ) ( * Waku , error ) {
2022-12-09 16:16:21 +00:00
var err error
2021-06-16 20:19:45 +00:00
if logger == nil {
2022-12-09 16:16:21 +00:00
logger , err = zap . NewDevelopment ( )
if err != nil {
return nil , err
}
2021-06-16 20:19:45 +00:00
}
2023-03-24 12:05:42 +00:00
if ts == nil {
ts = timesource . Default ( )
}
2021-11-18 18:31:28 +00:00
cfg = setDefaults ( cfg )
2024-03-01 05:43:07 +00:00
if err = cfg . Validate ( logger ) ; err != nil {
return nil , err
2024-02-27 09:24:34 +00:00
}
2021-11-18 18:31:28 +00:00
2023-09-13 10:50:23 +00:00
logger . Info ( "starting wakuv2 with config" , zap . Any ( "config" , cfg ) )
2021-06-16 20:19:45 +00:00
2023-11-17 13:11:01 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2021-06-16 20:19:45 +00:00
waku := & Waku {
2023-07-05 15:56:34 +00:00
appDB : appDB ,
cfg : cfg ,
privateKeys : make ( map [ string ] * ecdsa . PrivateKey ) ,
symKeys : make ( map [ string ] [ ] byte ) ,
2024-04-17 12:19:03 +00:00
envelopeCache : newTTLCache ( ) ,
2023-07-05 15:56:34 +00:00
expirations : make ( map [ uint32 ] mapset . Set ) ,
msgQueue : make ( chan * common . ReceivedMessage , messageQueueLimit ) ,
sendQueue : make ( chan * protocol . Envelope , 1000 ) ,
2024-05-15 23:15:00 +00:00
topicHealthStatusChan : make ( chan peermanager . TopicHealthStatus , 100 ) ,
2024-07-11 18:36:34 +00:00
connectionNotifChan : make ( chan node . PeerConnection ) ,
2023-07-05 15:56:34 +00:00
connStatusSubscriptions : make ( map [ string ] * types . ConnStatusSubscription ) ,
2024-06-26 18:25:44 +00:00
topicInterest : make ( map [ string ] TopicInterest ) ,
2023-11-17 13:11:01 +00:00
ctx : ctx ,
cancel : cancel ,
2023-07-05 15:56:34 +00:00
wg : sync . WaitGroup { } ,
dnsAddressCache : make ( map [ string ] [ ] dnsdisc . DiscoveredNode ) ,
dnsAddressCacheLock : & sync . RWMutex { } ,
storeMsgIDs : make ( map [ gethcommon . Hash ] bool ) ,
timesource : ts ,
storeMsgIDsMu : sync . RWMutex { } ,
2024-06-11 07:45:01 +00:00
sendMsgIDs : make ( map [ string ] map [ gethcommon . Hash ] uint32 ) ,
sendMsgIDsMu : sync . RWMutex { } ,
2023-07-05 15:56:34 +00:00
logger : logger ,
discV5BootstrapNodes : cfg . DiscV5BootstrapNodes ,
onHistoricMessagesRequestFailed : onHistoricMessagesRequestFailed ,
onPeerStats : onPeerStats ,
2024-06-28 13:54:48 +00:00
onlineChecker : onlinechecker . NewDefaultOnlineChecker ( false ) . ( * onlinechecker . DefaultOnlineChecker ) ,
2021-06-16 20:19:45 +00:00
}
2024-02-27 09:24:34 +00:00
waku . filters = common . NewFilters ( waku . cfg . DefaultShardPubsubTopic , waku . logger )
2021-08-30 21:35:37 +00:00
waku . bandwidthCounter = metrics . NewBandwidthCounter ( )
2021-06-16 20:19:45 +00:00
2024-05-15 23:15:22 +00:00
if nodeKey == nil {
// No nodekey is provided, create an ephemeral key
nodeKey , err = crypto . GenerateKey ( )
if err != nil {
return nil , fmt . Errorf ( "failed to generate a random go-waku private key: %v" , err )
}
2021-06-16 20:19:45 +00:00
}
hostAddr , err := net . ResolveTCPAddr ( "tcp" , fmt . Sprint ( cfg . Host , ":" , cfg . Port ) )
if err != nil {
return nil , fmt . Errorf ( "failed to setup the network interface: %v" , err )
}
2021-10-12 12:39:28 +00:00
libp2pOpts := node . DefaultLibP2POptions
libp2pOpts = append ( libp2pOpts , libp2p . BandwidthReporter ( waku . bandwidthCounter ) )
2022-03-30 00:51:51 +00:00
libp2pOpts = append ( libp2pOpts , libp2p . NATPortMap ( ) )
2021-10-12 12:39:28 +00:00
2021-09-23 16:17:57 +00:00
opts := [ ] node . WakuNodeOption {
2021-11-09 12:22:34 +00:00
node . WithLibP2POptions ( libp2pOpts ... ) ,
2024-05-15 23:15:22 +00:00
node . WithPrivateKey ( nodeKey ) ,
2021-11-22 13:40:14 +00:00
node . WithHostAddress ( hostAddr ) ,
2024-07-11 18:36:34 +00:00
node . WithConnectionNotification ( waku . connectionNotifChan ) ,
2024-05-15 23:15:00 +00:00
node . WithTopicHealthStatusChannel ( waku . topicHealthStatusChan ) ,
2024-07-11 18:36:34 +00:00
node . WithKeepAlive ( randomPeersKeepAliveInterval , allPeersKeepAliveInterval ) ,
2022-10-20 16:14:11 +00:00
node . WithLogger ( logger ) ,
2024-02-12 12:10:29 +00:00
node . WithLogLevel ( logger . Level ( ) ) ,
2023-11-23 09:02:30 +00:00
node . WithClusterID ( cfg . ClusterID ) ,
2024-01-05 20:43:05 +00:00
node . WithMaxMsgSize ( 1024 * 1024 ) ,
2021-09-23 16:17:57 +00:00
}
2024-02-27 09:24:34 +00:00
if cfg . EnableDiscV5 {
2023-11-17 13:11:01 +00:00
bootnodes , err := waku . getDiscV5BootstrapNodes ( waku . ctx , cfg . DiscV5BootstrapNodes )
2022-09-15 14:32:54 +00:00
if err != nil {
2022-12-09 16:16:21 +00:00
logger . Error ( "failed to get bootstrap nodes" , zap . Error ( err ) )
2022-09-15 14:32:54 +00:00
return nil , err
2021-11-22 13:40:14 +00:00
}
2023-01-16 13:00:07 +00:00
opts = append ( opts , node . WithDiscoveryV5 ( uint ( cfg . UDPPort ) , bootnodes , cfg . AutoUpdate ) )
2021-10-06 16:08:54 +00:00
}
2024-06-28 08:26:56 +00:00
shards , err := protocol . TopicsToRelayShards ( cfg . DefaultShardPubsubTopic )
if err != nil {
logger . Error ( "FATAL ERROR: failed to parse relay shards" , zap . Error ( err ) )
return nil , errors . New ( "failed to parse relay shard, invalid pubsubTopic configuration" )
}
if len ( shards ) == 0 { //Hack so that tests don't fail. TODO: Need to remove this once tests are changed to use proper cluster and shard.
shardInfo := protocol . RelayShards { ClusterID : 0 , ShardIDs : [ ] uint16 { 0 } }
shards = append ( shards , shardInfo )
}
waku . defaultShardInfo = shards [ 0 ]
2021-09-23 16:17:57 +00:00
if cfg . LightClient {
2023-06-07 09:02:19 +00:00
opts = append ( opts , node . WithWakuFilterLightNode ( ) )
2024-06-28 08:26:56 +00:00
waku . defaultShardInfo = shards [ 0 ]
opts = append ( opts , node . WithMaxPeerConnections ( cfg . DiscoveryLimit ) )
2021-09-23 16:17:57 +00:00
} else {
2021-10-19 13:43:41 +00:00
relayOpts := [ ] pubsub . Option {
2024-02-27 09:24:34 +00:00
pubsub . WithMaxMessageSize ( int ( waku . cfg . MaxMessageSize ) ) ,
2021-09-28 14:03:32 +00:00
}
2024-02-12 12:10:29 +00:00
if waku . logger . Level ( ) == zap . DebugLevel {
relayOpts = append ( relayOpts , pubsub . WithEventTracer ( waku ) )
}
2024-02-27 09:24:34 +00:00
opts = append ( opts , node . WithWakuRelayAndMinPeers ( waku . cfg . MinPeersForRelay , relayOpts ... ) )
2024-06-28 08:26:56 +00:00
opts = append ( opts , node . WithMaxPeerConnections ( maxRelayPeers ) )
cfg . EnablePeerExchangeClient = true //Enabling this until discv5 issues are resolved. This will enable more peers to be connected for relay mesh.
2021-09-23 16:17:57 +00:00
}
2022-08-19 16:34:07 +00:00
if cfg . EnableStore {
2023-11-24 10:13:26 +00:00
if appDB == nil {
return nil , errors . New ( "appDB is required for store" )
}
2023-01-07 15:25:55 +00:00
opts = append ( opts , node . WithWakuStore ( ) )
2022-08-19 16:34:07 +00:00
dbStore , err := persistence . NewDBStore ( logger , persistence . WithDB ( appDB ) , persistence . WithRetentionPolicy ( cfg . StoreCapacity , time . Duration ( cfg . StoreSeconds ) * time . Second ) )
if err != nil {
return nil , err
}
opts = append ( opts , node . WithMessageProvider ( dbStore ) )
}
2024-01-31 18:45:24 +00:00
if ! cfg . LightClient {
opts = append ( opts , node . WithWakuFilterFullNode ( filter . WithMaxSubscribers ( 20 ) ) )
2024-07-12 10:34:56 +00:00
opts = append ( opts , node . WithLightPush ( lightpush . WithRateLimiter ( 1 , 1 ) ) )
2023-09-27 20:06:53 +00:00
}
2023-05-22 21:38:02 +00:00
if appDB != nil {
waku . protectedTopicStore , err = persistence . NewProtectedTopicsStore ( logger , appDB )
if err != nil {
return nil , err
}
}
2024-07-12 13:21:21 +00:00
if cfg . EnablePeerExchangeServer {
opts = append ( opts , node . WithPeerExchange ( peer_exchange . WithRateLimiter ( 1 , 1 ) ) )
}
2024-02-27 09:24:34 +00:00
waku . options = opts
2022-11-04 13:56:45 +00:00
waku . logger . Info ( "setup the go-waku node successfully" )
2021-09-23 16:17:57 +00:00
return waku , nil
}
2022-01-12 16:02:01 +00:00
func ( w * Waku ) SubscribeToConnStatusChanges ( ) * types . ConnStatusSubscription {
w . connStatusMu . Lock ( )
defer w . connStatusMu . Unlock ( )
subscription := types . NewConnStatusSubscription ( )
w . connStatusSubscriptions [ subscription . ID ] = subscription
return subscription
}
2024-03-22 10:55:09 +00:00
func ( w * Waku ) GetNodeENRString ( ) ( string , error ) {
if w . node == nil {
return "" , errors . New ( "node not initialized" )
}
return w . node . ENR ( ) . String ( ) , nil
}
2021-11-22 15:27:05 +00:00
func ( w * Waku ) getDiscV5BootstrapNodes ( ctx context . Context , addresses [ ] string ) ( [ ] * enode . Node , error ) {
2022-09-15 14:32:54 +00:00
wg := sync . WaitGroup { }
mu := sync . Mutex { }
var result [ ] * enode . Node
2024-05-26 23:16:26 +00:00
w . seededBootnodesForDiscV5 = true
2021-11-22 15:27:05 +00:00
retrieveENR := func ( d dnsdisc . DiscoveredNode , wg * sync . WaitGroup ) {
2022-09-15 14:32:54 +00:00
mu . Lock ( )
defer mu . Unlock ( )
2021-11-22 15:27:05 +00:00
defer wg . Done ( )
2022-09-16 13:09:13 +00:00
if d . ENR != nil {
result = append ( result , d . ENR )
}
2022-09-15 14:32:54 +00:00
}
for _ , addrString := range addresses {
if addrString == "" {
continue
}
if strings . HasPrefix ( addrString , "enrtree://" ) {
// Use DNS Discovery
wg . Add ( 1 )
go func ( addr string ) {
defer wg . Done ( )
2024-05-26 23:16:26 +00:00
if err := w . dnsDiscover ( ctx , addr , retrieveENR ) ; err != nil {
mu . Lock ( )
w . seededBootnodesForDiscV5 = false
mu . Unlock ( )
}
2022-09-15 14:32:54 +00:00
} ( addrString )
} else {
// It's a normal enr
bootnode , err := enode . Parse ( enode . ValidSchemes , addrString )
if err != nil {
return nil , err
}
result = append ( result , bootnode )
}
}
wg . Wait ( )
2022-12-09 16:16:21 +00:00
2022-09-15 14:32:54 +00:00
return result , nil
}
2021-11-22 15:27:05 +00:00
type fnApplyToEachPeer func ( d dnsdisc . DiscoveredNode , wg * sync . WaitGroup )
2024-05-26 23:16:26 +00:00
func ( w * Waku ) dnsDiscover ( ctx context . Context , enrtreeAddress string , apply fnApplyToEachPeer ) error {
2022-12-09 16:16:21 +00:00
w . logger . Info ( "retrieving nodes" , zap . String ( "enr" , enrtreeAddress ) )
2021-11-22 15:27:05 +00:00
ctx , cancel := context . WithTimeout ( ctx , requestTimeout )
2021-11-09 12:22:34 +00:00
defer cancel ( )
2022-09-16 13:09:13 +00:00
w . dnsAddressCacheLock . Lock ( )
defer w . dnsAddressCacheLock . Unlock ( )
2021-10-06 16:08:54 +00:00
2022-09-16 13:09:13 +00:00
discNodes , ok := w . dnsAddressCache [ enrtreeAddress ]
2021-11-09 12:22:34 +00:00
if ! ok {
2024-02-27 09:24:34 +00:00
nameserver := w . cfg . Nameserver
resolver := w . cfg . Resolver
2023-01-03 14:14:59 +00:00
2023-09-13 10:50:23 +00:00
var opts [ ] dnsdisc . DNSDiscoveryOption
2023-01-03 14:14:59 +00:00
if nameserver != "" {
opts = append ( opts , dnsdisc . WithNameserver ( nameserver ) )
}
2023-12-07 00:28:08 +00:00
if resolver != nil {
opts = append ( opts , dnsdisc . WithResolver ( resolver ) )
}
2023-01-03 14:14:59 +00:00
discoveredNodes , err := dnsdisc . RetrieveNodes ( ctx , enrtreeAddress , opts ... )
2021-10-06 16:08:54 +00:00
if err != nil {
2022-11-04 13:56:45 +00:00
w . logger . Warn ( "dns discovery error " , zap . Error ( err ) )
2024-05-26 23:16:26 +00:00
return err
2021-10-06 16:08:54 +00:00
}
2022-09-16 13:09:13 +00:00
2022-12-09 16:16:21 +00:00
if len ( discoveredNodes ) != 0 {
w . dnsAddressCache [ enrtreeAddress ] = append ( w . dnsAddressCache [ enrtreeAddress ] , discoveredNodes ... )
discNodes = w . dnsAddressCache [ enrtreeAddress ]
}
2021-11-09 12:22:34 +00:00
}
2021-10-06 16:08:54 +00:00
2021-11-22 15:27:05 +00:00
wg := & sync . WaitGroup { }
wg . Add ( len ( discNodes ) )
2022-09-15 14:32:54 +00:00
for _ , d := range discNodes {
2021-11-22 15:27:05 +00:00
apply ( d , wg )
}
wg . Wait ( )
2024-05-26 23:16:26 +00:00
return nil
2021-11-22 15:27:05 +00:00
}
2024-02-27 09:24:34 +00:00
func ( w * Waku ) discoverAndConnectPeers ( ) error {
2021-11-22 15:27:05 +00:00
fnApply := func ( d dnsdisc . DiscoveredNode , wg * sync . WaitGroup ) {
2023-11-17 13:23:57 +00:00
defer wg . Done ( )
2023-05-16 15:57:47 +00:00
if len ( d . PeerInfo . Addrs ) != 0 {
2024-06-14 12:41:45 +00:00
go w . connect ( d . PeerInfo , d . ENR , wps . DNSDiscovery )
2021-11-22 15:27:05 +00:00
}
}
2024-02-27 09:24:34 +00:00
for _ , addrString := range w . cfg . WakuNodes {
2023-01-03 14:14:59 +00:00
addrString := addrString
2021-11-22 15:27:05 +00:00
if strings . HasPrefix ( addrString , "enrtree://" ) {
// Use DNS Discovery
2024-05-26 23:16:26 +00:00
go func ( ) {
if err := w . dnsDiscover ( w . ctx , addrString , fnApply ) ; err != nil {
w . logger . Error ( "could not obtain dns discovery peers for ClusterConfig.WakuNodes" , zap . Error ( err ) , zap . String ( "dnsDiscURL" , addrString ) )
}
} ( )
2021-11-22 15:27:05 +00:00
} else {
// It is a normal multiaddress
addr , err := multiaddr . NewMultiaddr ( addrString )
if err != nil {
w . logger . Warn ( "invalid peer multiaddress" , zap . String ( "ma" , addrString ) , zap . Error ( err ) )
continue
}
2023-11-10 15:31:59 +00:00
peerInfo , err := peer . AddrInfoFromP2pAddr ( addr )
if err != nil {
w . logger . Warn ( "invalid peer multiaddress" , zap . Stringer ( "addr" , addr ) , zap . Error ( err ) )
continue
}
2024-06-14 12:41:45 +00:00
go w . connect ( * peerInfo , nil , wps . Static )
2021-11-22 15:27:05 +00:00
}
2021-10-06 16:08:54 +00:00
}
2021-11-22 15:27:05 +00:00
return nil
2021-10-06 16:08:54 +00:00
}
2024-06-14 12:41:45 +00:00
func ( w * Waku ) connect ( peerInfo peer . AddrInfo , enr * enode . Node , origin wps . Origin ) {
2024-05-27 13:43:47 +00:00
// Connection will be prunned eventually by the connection manager if needed
// The peer connector in go-waku uses Connect, so it will execute identify as part of its
2024-06-14 12:41:45 +00:00
w . node . AddDiscoveredPeer ( peerInfo . ID , peerInfo . Addrs , origin , w . cfg . DefaultShardedPubsubTopics , enr , true )
2021-06-16 20:19:45 +00:00
}
2023-01-31 22:46:47 +00:00
func ( w * Waku ) telemetryBandwidthStats ( telemetryServerURL string ) {
if telemetryServerURL == "" {
return
}
2023-02-17 13:15:28 +00:00
telemetry := NewBandwidthTelemetryClient ( w . logger , telemetryServerURL )
2023-01-31 22:46:47 +00:00
ticker := time . NewTicker ( time . Second * 20 )
defer ticker . Stop ( )
2023-02-01 15:46:01 +00:00
today := time . Now ( )
2023-01-31 22:46:47 +00:00
for {
select {
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2023-01-31 22:46:47 +00:00
return
2023-02-01 15:46:01 +00:00
case now := <- ticker . C :
// Reset totals when day changes
if now . Day ( ) != today . Day ( ) {
today = now
w . bandwidthCounter . Reset ( )
}
2024-05-15 23:15:00 +00:00
storeStats := w . bandwidthCounter . GetBandwidthForProtocol ( legacy_store . StoreID_v20beta4 )
2023-01-31 22:46:47 +00:00
relayStats := w . bandwidthCounter . GetBandwidthForProtocol ( relay . WakuRelayID_v200 )
go telemetry . PushProtocolStats ( relayStats , storeStats )
}
}
}
2021-08-03 19:27:15 +00:00
func ( w * Waku ) GetStats ( ) types . StatsSummary {
2021-08-30 21:35:37 +00:00
stats := w . bandwidthCounter . GetBandwidthTotals ( )
return types . StatsSummary {
UploadRate : uint64 ( stats . RateOut ) ,
DownloadRate : uint64 ( stats . RateIn ) ,
}
2021-08-03 19:27:15 +00:00
}
2022-11-29 12:43:11 +00:00
func ( w * Waku ) runPeerExchangeLoop ( ) {
defer w . wg . Done ( )
2024-02-27 09:24:34 +00:00
if ! w . cfg . EnablePeerExchangeClient {
2024-02-20 06:17:40 +00:00
// Currently peer exchange client is only used for light nodes
2022-11-29 12:43:11 +00:00
return
}
ticker := time . NewTicker ( time . Second * 5 )
defer ticker . Stop ( )
for {
select {
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2024-01-30 11:45:08 +00:00
w . logger . Debug ( "Peer exchange loop stopped" )
2022-11-29 12:43:11 +00:00
return
case <- ticker . C :
2023-09-13 10:50:23 +00:00
w . logger . Info ( "Running peer exchange loop" )
2022-11-29 12:43:11 +00:00
// We select only the nodes discovered via DNS Discovery that support peer exchange
2024-05-27 13:43:47 +00:00
// We assume that those peers are running peer exchange according to infra config,
// If not, the peer selection process in go-waku will filter them out anyway
2022-11-29 12:43:11 +00:00
w . dnsAddressCacheLock . RLock ( )
2024-05-27 13:43:47 +00:00
var peers [ ] peer . ID
2022-11-29 12:43:11 +00:00
for _ , record := range w . dnsAddressCache {
for _ , discoveredNode := range record {
2023-05-16 15:57:47 +00:00
if len ( discoveredNode . PeerInfo . Addrs ) == 0 {
2022-11-29 12:43:11 +00:00
continue
}
2024-05-27 13:43:47 +00:00
// Attempt to connect to the peers.
// Peers will be added to the libp2p peer store thanks to identify
2024-06-14 12:41:45 +00:00
go w . connect ( discoveredNode . PeerInfo , discoveredNode . ENR , wps . DNSDiscovery )
2024-05-27 13:43:47 +00:00
peers = append ( peers , discoveredNode . PeerID )
2022-11-29 12:43:11 +00:00
}
}
w . dnsAddressCacheLock . RUnlock ( )
2024-05-27 13:43:47 +00:00
if len ( peers ) != 0 {
2024-06-28 08:26:56 +00:00
err := w . node . PeerExchange ( ) . Request ( w . ctx , w . cfg . DiscoveryLimit , peer_exchange . WithAutomaticPeerSelection ( peers ... ) ,
peer_exchange . FilterByShard ( int ( w . defaultShardInfo . ClusterID ) , int ( w . defaultShardInfo . ShardIDs [ 0 ] ) ) )
2024-05-27 13:43:47 +00:00
if err != nil {
w . logger . Error ( "couldnt request peers via peer exchange" , zap . Error ( err ) )
}
2022-11-29 12:43:11 +00:00
}
}
}
}
2024-06-06 13:52:51 +00:00
func ( w * Waku ) GetPubsubTopic ( topic string ) string {
2023-11-15 15:58:15 +00:00
if topic == "" || ! w . cfg . UseShardAsDefaultTopic {
2024-02-27 09:24:34 +00:00
topic = w . cfg . DefaultShardPubsubTopic
2023-11-15 15:58:15 +00:00
}
return topic
}
2024-01-30 17:56:59 +00:00
func ( w * Waku ) unsubscribeFromPubsubTopicWithWakuRelay ( topic string ) error {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2024-01-30 17:56:59 +00:00
if ! w . node . Relay ( ) . IsSubscribed ( topic ) {
return nil
}
contentFilter := protocol . NewContentFilter ( topic )
return w . node . Relay ( ) . Unsubscribe ( w . ctx , contentFilter )
}
2023-05-22 21:38:02 +00:00
func ( w * Waku ) subscribeToPubsubTopicWithWakuRelay ( topic string , pubkey * ecdsa . PublicKey ) error {
2024-02-27 09:24:34 +00:00
if w . cfg . LightClient {
2023-05-22 21:38:02 +00:00
return errors . New ( "only available for full nodes" )
}
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2023-11-10 00:29:15 +00:00
2023-05-22 21:38:02 +00:00
if w . node . Relay ( ) . IsSubscribed ( topic ) {
return nil
2021-09-23 16:17:57 +00:00
}
2023-05-22 21:38:02 +00:00
if pubkey != nil {
err := w . node . Relay ( ) . AddSignedTopicValidator ( topic , pubkey )
if err != nil {
return err
}
}
2023-11-10 00:29:15 +00:00
contentFilter := protocol . NewContentFilter ( topic )
sub , err := w . node . Relay ( ) . Subscribe ( w . ctx , contentFilter )
2021-06-16 20:19:45 +00:00
if err != nil {
2023-05-22 21:38:02 +00:00
return err
2021-06-16 20:19:45 +00:00
}
2023-05-22 21:38:02 +00:00
w . wg . Add ( 1 )
go func ( ) {
defer w . wg . Done ( )
for {
select {
case <- w . ctx . Done ( ) :
2023-11-10 00:29:15 +00:00
err := w . node . Relay ( ) . Unsubscribe ( w . ctx , contentFilter )
if err != nil && ! errors . Is ( err , context . Canceled ) {
w . logger . Error ( "could not unsubscribe" , zap . Error ( err ) )
}
2023-05-22 21:38:02 +00:00
return
2023-11-10 00:29:15 +00:00
case env := <- sub [ 0 ] . Ch :
2023-12-15 19:50:12 +00:00
err := w . OnNewEnvelopes ( env , common . RelayedMessageType , false )
2023-05-22 21:38:02 +00:00
if err != nil {
2023-09-13 10:50:23 +00:00
w . logger . Error ( "OnNewEnvelopes error" , zap . Error ( err ) )
2023-05-22 21:38:02 +00:00
}
2022-11-11 14:34:56 +00:00
}
2022-11-04 13:56:45 +00:00
}
2023-05-22 21:38:02 +00:00
} ( )
return nil
2021-06-16 20:19:45 +00:00
}
// MaxMessageSize returns the maximum accepted message size.
func ( w * Waku ) MaxMessageSize ( ) uint32 {
2024-02-27 09:24:34 +00:00
return w . cfg . MaxMessageSize
2021-06-16 20:19:45 +00:00
}
// CurrentTime returns current time.
func ( w * Waku ) CurrentTime ( ) time . Time {
2023-03-24 12:05:42 +00:00
return w . timesource . Now ( )
2021-06-16 20:19:45 +00:00
}
// APIs returns the RPC descriptors the Waku implementation offers
func ( w * Waku ) APIs ( ) [ ] rpc . API {
return [ ] rpc . API {
{
Namespace : Name ,
Version : VersionStr ,
Service : NewPublicWakuAPI ( w ) ,
Public : false ,
} ,
}
}
// Protocols returns the waku sub-protocols ran by this particular client.
func ( w * Waku ) Protocols ( ) [ ] p2p . Protocol {
return [ ] p2p . Protocol { }
}
func ( w * Waku ) SendEnvelopeEvent ( event common . EnvelopeEvent ) int {
return w . envelopeFeed . Send ( event )
}
// SubscribeEnvelopeEvents subscribes to envelopes feed.
// In order to prevent blocking waku producers events must be amply buffered.
func ( w * Waku ) SubscribeEnvelopeEvents ( events chan <- common . EnvelopeEvent ) event . Subscription {
return w . envelopeFeed . Subscribe ( events )
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption. Returns ID of the new key pair.
func ( w * Waku ) NewKeyPair ( ) ( string , error ) {
key , err := crypto . GenerateKey ( )
if err != nil || ! validatePrivateKey ( key ) {
key , err = crypto . GenerateKey ( ) // retry once
}
if err != nil {
return "" , err
}
if ! validatePrivateKey ( key ) {
return "" , fmt . Errorf ( "failed to generate valid key" )
}
id , err := toDeterministicID ( hexutil . Encode ( crypto . FromECDSAPub ( & key . PublicKey ) ) , common . KeyIDSize )
if err != nil {
return "" , err
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . privateKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . privateKeys [ id ] = key
return id , nil
}
// DeleteKeyPair deletes the specified key if it exists.
func ( w * Waku ) DeleteKeyPair ( key string ) bool {
deterministicID , err := toDeterministicID ( key , common . KeyIDSize )
if err != nil {
return false
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . privateKeys [ deterministicID ] != nil {
delete ( w . privateKeys , deterministicID )
return true
}
return false
}
// AddKeyPair imports a asymmetric private key and returns it identifier.
func ( w * Waku ) AddKeyPair ( key * ecdsa . PrivateKey ) ( string , error ) {
id , err := makeDeterministicID ( hexutil . Encode ( crypto . FromECDSAPub ( & key . PublicKey ) ) , common . KeyIDSize )
if err != nil {
return "" , err
}
if w . HasKeyPair ( id ) {
return id , nil // no need to re-inject
}
w . keyMu . Lock ( )
w . privateKeys [ id ] = key
w . keyMu . Unlock ( )
return id , nil
}
// SelectKeyPair adds cryptographic identity, and makes sure
// that it is the only private key known to the node.
func ( w * Waku ) SelectKeyPair ( key * ecdsa . PrivateKey ) error {
id , err := makeDeterministicID ( hexutil . Encode ( crypto . FromECDSAPub ( & key . PublicKey ) ) , common . KeyIDSize )
if err != nil {
return err
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
w . privateKeys = make ( map [ string ] * ecdsa . PrivateKey ) // reset key store
w . privateKeys [ id ] = key
return nil
}
// DeleteKeyPairs removes all cryptographic identities known to the node
func ( w * Waku ) DeleteKeyPairs ( ) error {
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
w . privateKeys = make ( map [ string ] * ecdsa . PrivateKey )
return nil
}
// HasKeyPair checks if the waku node is configured with the private key
// of the specified public pair.
func ( w * Waku ) HasKeyPair ( id string ) bool {
deterministicID , err := toDeterministicID ( id , common . KeyIDSize )
if err != nil {
return false
}
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
return w . privateKeys [ deterministicID ] != nil
}
// GetPrivateKey retrieves the private key of the specified identity.
func ( w * Waku ) GetPrivateKey ( id string ) ( * ecdsa . PrivateKey , error ) {
deterministicID , err := toDeterministicID ( id , common . KeyIDSize )
if err != nil {
return nil , err
}
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
key := w . privateKeys [ deterministicID ]
if key == nil {
return nil , fmt . Errorf ( "invalid id" )
}
return key , nil
}
// GenerateSymKey generates a random symmetric key and stores it under id,
// which is then returned. Will be used in the future for session key exchange.
func ( w * Waku ) GenerateSymKey ( ) ( string , error ) {
key , err := common . GenerateSecureRandomData ( common . AESKeyLength )
if err != nil {
return "" , err
} else if ! common . ValidateDataIntegrity ( key , common . AESKeyLength ) {
return "" , fmt . Errorf ( "error in GenerateSymKey: crypto/rand failed to generate random data" )
}
id , err := common . GenerateRandomID ( )
if err != nil {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . symKeys [ id ] = key
return id , nil
}
// AddSymKey stores the key with a given id.
func ( w * Waku ) AddSymKey ( id string , key [ ] byte ) ( string , error ) {
deterministicID , err := toDeterministicID ( id , common . KeyIDSize )
if err != nil {
return "" , err
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . symKeys [ deterministicID ] != nil {
return "" , fmt . Errorf ( "key already exists: %v" , id )
}
w . symKeys [ deterministicID ] = key
return deterministicID , nil
}
// AddSymKeyDirect stores the key, and returns its id.
func ( w * Waku ) AddSymKeyDirect ( key [ ] byte ) ( string , error ) {
if len ( key ) != common . AESKeyLength {
return "" , fmt . Errorf ( "wrong key size: %d" , len ( key ) )
}
id , err := common . GenerateRandomID ( )
if err != nil {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . symKeys [ id ] = key
return id , nil
}
// AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
func ( w * Waku ) AddSymKeyFromPassword ( password string ) ( string , error ) {
id , err := common . GenerateRandomID ( )
if err != nil {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
if w . HasSymKey ( id ) {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
// kdf should run no less than 0.1 seconds on an average computer,
// because it's an once in a session experience
derived := pbkdf2 . Key ( [ ] byte ( password ) , nil , 65356 , common . AESKeyLength , sha256 . New )
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
// double check is necessary, because deriveKeyMaterial() is very slow
if w . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "critical error: failed to generate unique ID" )
}
w . symKeys [ id ] = derived
return id , nil
}
// HasSymKey returns true if there is a key associated with the given id.
// Otherwise returns false.
func ( w * Waku ) HasSymKey ( id string ) bool {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
return w . symKeys [ id ] != nil
}
// DeleteSymKey deletes the key associated with the name string if it exists.
func ( w * Waku ) DeleteSymKey ( id string ) bool {
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
delete ( w . symKeys , id )
return true
}
return false
}
// GetSymKey returns the symmetric key associated with the given id.
func ( w * Waku ) GetSymKey ( id string ) ( [ ] byte , error ) {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
if w . symKeys [ id ] != nil {
return w . symKeys [ id ] , nil
}
return nil , fmt . Errorf ( "non-existent key ID" )
}
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
func ( w * Waku ) Subscribe ( f * common . Filter ) ( string , error ) {
2024-06-06 13:52:51 +00:00
f . PubsubTopic = w . GetPubsubTopic ( f . PubsubTopic )
2023-09-13 10:50:23 +00:00
id , err := w . filters . Install ( f )
2021-06-16 20:19:45 +00:00
if err != nil {
2023-09-13 10:50:23 +00:00
return id , err
2021-06-16 20:19:45 +00:00
}
2024-02-27 09:24:34 +00:00
if w . cfg . LightClient {
2024-06-14 12:41:45 +00:00
w . filterManager . addFilter ( id , f )
2021-09-23 16:17:57 +00:00
}
2023-09-13 10:50:23 +00:00
return id , nil
2021-06-16 20:19:45 +00:00
}
// Unsubscribe removes an installed message handler.
2023-07-14 13:42:02 +00:00
func ( w * Waku ) Unsubscribe ( ctx context . Context , id string ) error {
2021-06-16 20:19:45 +00:00
ok := w . filters . Uninstall ( id )
if ! ok {
return fmt . Errorf ( "failed to unsubscribe: invalid ID '%s'" , id )
}
2023-09-13 10:50:23 +00:00
2024-02-27 09:24:34 +00:00
if w . cfg . LightClient {
2024-06-14 12:41:45 +00:00
w . filterManager . removeFilter ( id )
2023-09-13 10:50:23 +00:00
}
2021-06-16 20:19:45 +00:00
return nil
}
2023-06-07 09:02:19 +00:00
// GetFilter returns the filter by id.
func ( w * Waku ) GetFilter ( id string ) * common . Filter {
return w . filters . Get ( id )
}
2021-06-16 20:19:45 +00:00
// Unsubscribe removes an installed message handler.
func ( w * Waku ) UnsubscribeMany ( ids [ ] string ) error {
for _ , id := range ids {
2023-09-13 10:50:23 +00:00
w . logger . Info ( "cleaning up filter" , zap . String ( "id" , id ) )
2021-06-16 20:19:45 +00:00
ok := w . filters . Uninstall ( id )
if ! ok {
w . logger . Warn ( "could not remove filter with id" , zap . String ( "id" , id ) )
}
}
return nil
}
2023-11-22 22:43:22 +00:00
func ( w * Waku ) SkipPublishToTopic ( value bool ) {
2024-02-27 09:24:34 +00:00
w . cfg . SkipPublishToTopic = value
2023-11-22 22:43:22 +00:00
}
2024-06-13 22:31:09 +00:00
type PublishMethod int
const (
LightPush PublishMethod = iota
Relay
)
func ( pm PublishMethod ) String ( ) string {
switch pm {
case LightPush :
return "LightPush"
case Relay :
return "Relay"
default :
return "Unknown"
}
}
2021-11-18 18:31:28 +00:00
func ( w * Waku ) broadcast ( ) {
for {
select {
2023-03-08 15:05:46 +00:00
case envelope := <- w . sendQueue :
2024-05-15 23:15:00 +00:00
logger := w . logger . With ( zap . Stringer ( "envelopeHash" , envelope . Hash ( ) ) , zap . String ( "pubsubTopic" , envelope . PubsubTopic ( ) ) , zap . String ( "contentTopic" , envelope . Message ( ) . ContentTopic ) , zap . Int64 ( "timestamp" , envelope . Message ( ) . GetTimestamp ( ) ) )
2023-12-15 12:02:04 +00:00
var fn publishFn
2024-06-13 22:31:09 +00:00
var publishMethod PublishMethod
2024-02-27 09:24:34 +00:00
if w . cfg . SkipPublishToTopic {
2023-12-15 12:02:04 +00:00
// For now only used in testing to simulate going offline
2024-06-28 10:24:04 +00:00
publishMethod = LightPush
2023-12-19 15:42:50 +00:00
fn = func ( env * protocol . Envelope , logger * zap . Logger ) error {
return errors . New ( "test send failure" )
}
2024-02-27 09:24:34 +00:00
} else if w . cfg . LightClient {
2024-06-13 22:31:09 +00:00
publishMethod = LightPush
2023-12-15 12:02:04 +00:00
fn = func ( env * protocol . Envelope , logger * zap . Logger ) error {
logger . Info ( "publishing message via lightpush" )
_ , err := w . node . Lightpush ( ) . Publish ( w . ctx , env . Message ( ) , lightpush . WithPubSubTopic ( env . PubsubTopic ( ) ) )
return err
}
2021-11-18 18:31:28 +00:00
} else {
2024-06-13 22:31:09 +00:00
publishMethod = Relay
2023-12-15 12:02:04 +00:00
fn = func ( env * protocol . Envelope , logger * zap . Logger ) error {
2024-01-30 16:42:26 +00:00
peerCnt := len ( w . node . Relay ( ) . PubSub ( ) . ListPeers ( env . PubsubTopic ( ) ) )
logger . Info ( "publishing message via relay" , zap . Int ( "peerCnt" , peerCnt ) )
2023-12-15 12:02:04 +00:00
_ , err := w . node . Relay ( ) . Publish ( w . ctx , env . Message ( ) , relay . WithPubSubTopic ( env . PubsubTopic ( ) ) )
return err
}
2021-11-18 18:31:28 +00:00
}
2024-06-13 22:31:09 +00:00
// Wraps the publish function with a call to the telemetry client
if w . statusTelemetryClient != nil {
sendFn := fn
fn = func ( env * protocol . Envelope , logger * zap . Logger ) error {
err := sendFn ( env , logger )
if err == nil {
w . statusTelemetryClient . PushSentEnvelope ( SentEnvelope { Envelope : env , PublishMethod : publishMethod } )
2024-06-28 10:24:04 +00:00
} else {
w . statusTelemetryClient . PushErrorSendingEnvelope ( ErrorSendingEnvelope { Error : err , SentEnvelope : SentEnvelope { Envelope : env , PublishMethod : publishMethod } } )
2024-06-13 22:31:09 +00:00
}
return err
}
}
2023-12-19 15:42:50 +00:00
w . wg . Add ( 1 )
2023-12-15 12:02:04 +00:00
go w . publishEnvelope ( envelope , fn , logger )
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2021-11-18 18:31:28 +00:00
return
}
2021-11-02 18:27:37 +00:00
}
2021-11-18 18:31:28 +00:00
}
2021-11-02 18:27:37 +00:00
2024-06-11 07:45:01 +00:00
func ( w * Waku ) checkIfMessagesStored ( ) {
ticker := time . NewTicker ( hashQueryInterval )
defer ticker . Stop ( )
for {
select {
case <- w . ctx . Done ( ) :
w . logger . Debug ( "stop the look for message stored check" )
return
case <- ticker . C :
w . sendMsgIDsMu . Lock ( )
w . logger . Debug ( "running loop for messages stored check" , zap . Any ( "messageIds" , w . sendMsgIDs ) )
pubsubTopics := make ( [ ] string , 0 , len ( w . sendMsgIDs ) )
pubsubMessageIds := make ( [ ] [ ] gethcommon . Hash , 0 , len ( w . sendMsgIDs ) )
2024-06-27 01:54:31 +00:00
pubsubMessageTime := make ( [ ] [ ] uint32 , 0 , len ( w . sendMsgIDs ) )
2024-06-11 07:45:01 +00:00
for pubsubTopic , subMsgs := range w . sendMsgIDs {
var queryMsgIds [ ] gethcommon . Hash
2024-06-27 01:54:31 +00:00
var queryMsgTime [ ] uint32
2024-06-11 07:45:01 +00:00
for msgID , sendTime := range subMsgs {
if len ( queryMsgIds ) >= maxHashQueryLength {
break
}
// message is sent 5 seconds ago, check if it's stored
if uint32 ( w . timesource . Now ( ) . Unix ( ) ) > sendTime + messageSentPeriod {
queryMsgIds = append ( queryMsgIds , msgID )
2024-06-27 01:54:31 +00:00
queryMsgTime = append ( queryMsgTime , sendTime )
2024-06-11 07:45:01 +00:00
}
}
w . logger . Debug ( "store query for message hashes" , zap . Any ( "queryMsgIds" , queryMsgIds ) , zap . String ( "pubsubTopic" , pubsubTopic ) )
if len ( queryMsgIds ) > 0 {
pubsubTopics = append ( pubsubTopics , pubsubTopic )
pubsubMessageIds = append ( pubsubMessageIds , queryMsgIds )
2024-06-27 01:54:31 +00:00
pubsubMessageTime = append ( pubsubMessageTime , queryMsgTime )
2024-06-11 07:45:01 +00:00
}
}
w . sendMsgIDsMu . Unlock ( )
pubsubProcessedMessages := make ( [ ] [ ] gethcommon . Hash , len ( pubsubTopics ) )
for i , pubsubTopic := range pubsubTopics {
2024-06-27 01:54:31 +00:00
processedMessages := w . messageHashBasedQuery ( w . ctx , pubsubMessageIds [ i ] , pubsubMessageTime [ i ] , pubsubTopic )
2024-06-11 07:45:01 +00:00
pubsubProcessedMessages [ i ] = processedMessages
}
w . sendMsgIDsMu . Lock ( )
for i , pubsubTopic := range pubsubTopics {
subMsgs , ok := w . sendMsgIDs [ pubsubTopic ]
if ! ok {
continue
}
for _ , hash := range pubsubProcessedMessages [ i ] {
delete ( subMsgs , hash )
if len ( subMsgs ) == 0 {
delete ( w . sendMsgIDs , pubsubTopic )
} else {
w . sendMsgIDs [ pubsubTopic ] = subMsgs
}
}
}
w . logger . Debug ( "messages for next store hash query" , zap . Any ( "messageIds" , w . sendMsgIDs ) )
w . sendMsgIDsMu . Unlock ( )
}
}
}
func ( w * Waku ) ConfirmMessageDelivered ( hashes [ ] gethcommon . Hash ) {
w . sendMsgIDsMu . Lock ( )
defer w . sendMsgIDsMu . Unlock ( )
for pubsubTopic , subMsgs := range w . sendMsgIDs {
for _ , hash := range hashes {
delete ( subMsgs , hash )
if len ( subMsgs ) == 0 {
delete ( w . sendMsgIDs , pubsubTopic )
} else {
w . sendMsgIDs [ pubsubTopic ] = subMsgs
}
}
}
}
func ( w * Waku ) SetStorePeerID ( peerID peer . ID ) {
w . storePeerID = peerID
}
2023-12-15 12:02:04 +00:00
type publishFn = func ( envelope * protocol . Envelope , logger * zap . Logger ) error
func ( w * Waku ) publishEnvelope ( envelope * protocol . Envelope , publishFn publishFn , logger * zap . Logger ) {
2023-12-19 15:42:50 +00:00
defer w . wg . Done ( )
2023-12-15 12:02:04 +00:00
if err := publishFn ( envelope , logger ) ; err != nil {
logger . Error ( "could not send message" , zap . Error ( err ) )
2024-06-11 07:45:01 +00:00
w . SendEnvelopeEvent ( common . EnvelopeEvent {
Hash : gethcommon . BytesToHash ( envelope . Hash ( ) . Bytes ( ) ) ,
Event : common . EventEnvelopeExpired ,
} )
2023-12-15 12:02:04 +00:00
}
}
2021-11-18 18:31:28 +00:00
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
2023-05-22 21:38:02 +00:00
func ( w * Waku ) Send ( pubsubTopic string , msg * pb . WakuMessage ) ( [ ] byte , error ) {
2024-06-06 13:52:51 +00:00
pubsubTopic = w . GetPubsubTopic ( pubsubTopic )
2023-05-22 21:38:02 +00:00
if w . protectedTopicStore != nil {
privKey , err := w . protectedTopicStore . FetchPrivateKey ( pubsubTopic )
if err != nil {
return nil , err
}
if privKey != nil {
err = relay . SignMessage ( privKey , msg , pubsubTopic )
if err != nil {
return nil , err
}
}
}
2023-12-05 04:29:27 +00:00
envelope := protocol . NewEnvelope ( msg , msg . GetTimestamp ( ) , pubsubTopic )
2021-11-02 18:27:37 +00:00
2023-03-08 15:05:46 +00:00
w . sendQueue <- envelope
2021-11-18 18:31:28 +00:00
2021-11-02 18:27:37 +00:00
w . poolMu . Lock ( )
2024-05-15 23:15:00 +00:00
alreadyCached := w . envelopeCache . Has ( gethcommon . BytesToHash ( envelope . Hash ( ) . Bytes ( ) ) )
2021-11-02 18:27:37 +00:00
w . poolMu . Unlock ( )
if ! alreadyCached {
2024-06-11 07:45:01 +00:00
recvMessage := common . NewReceivedMessage ( envelope , common . SendMessageType )
2021-11-18 18:31:28 +00:00
w . postEvent ( recvMessage ) // notify the local node about the new message
2021-11-02 18:27:37 +00:00
w . addEnvelope ( recvMessage )
}
2024-05-15 23:15:00 +00:00
return envelope . Hash ( ) . Bytes ( ) , nil
2021-06-16 20:19:45 +00:00
}
2024-06-11 07:45:01 +00:00
// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
2024-06-27 01:54:31 +00:00
func ( w * Waku ) messageHashBasedQuery ( ctx context . Context , hashes [ ] gethcommon . Hash , relayTime [ ] uint32 , pubsubTopic string ) [ ] gethcommon . Hash {
2024-06-11 07:45:01 +00:00
selectedPeer := w . storePeerID
if selectedPeer == "" {
w . logger . Error ( "no store peer id available" , zap . String ( "pubsubTopic" , pubsubTopic ) )
return [ ] gethcommon . Hash { }
}
var opts [ ] store . RequestOption
requestID := protocol . GenerateRequestID ( )
opts = append ( opts , store . WithRequestID ( requestID ) )
opts = append ( opts , store . WithPeer ( selectedPeer ) )
opts = append ( opts , store . WithPaging ( false , maxHashQueryLength ) )
opts = append ( opts , store . IncludeData ( false ) )
messageHashes := make ( [ ] pb . MessageHash , len ( hashes ) )
for i , hash := range hashes {
messageHashes [ i ] = pb . ToMessageHash ( hash . Bytes ( ) )
}
w . logger . Debug ( "store.queryByHash request" , zap . String ( "requestID" , hexutil . Encode ( requestID ) ) , zap . Stringer ( "peerID" , selectedPeer ) , zap . Any ( "messageHashes" , messageHashes ) )
result , err := w . node . Store ( ) . QueryByHash ( ctx , messageHashes , opts ... )
if err != nil {
w . logger . Error ( "store.queryByHash failed" , zap . String ( "requestID" , hexutil . Encode ( requestID ) ) , zap . Stringer ( "peerID" , selectedPeer ) , zap . Error ( err ) )
return [ ] gethcommon . Hash { }
}
w . logger . Debug ( "store.queryByHash result" , zap . String ( "requestID" , hexutil . Encode ( requestID ) ) , zap . Int ( "messages" , len ( result . Messages ( ) ) ) )
var ackHashes [ ] gethcommon . Hash
var missedHashes [ ] gethcommon . Hash
2024-06-27 01:54:31 +00:00
for i , hash := range hashes {
2024-06-11 07:45:01 +00:00
found := false
for _ , msg := range result . Messages ( ) {
if bytes . Equal ( msg . GetMessageHash ( ) , hash . Bytes ( ) ) {
found = true
break
}
}
if found {
ackHashes = append ( ackHashes , hash )
w . SendEnvelopeEvent ( common . EnvelopeEvent {
Hash : hash ,
Event : common . EventEnvelopeSent ,
} )
2024-06-27 01:54:31 +00:00
}
if ! found && uint32 ( w . timesource . Now ( ) . Unix ( ) ) > relayTime [ i ] + messageExpiredPerid {
2024-06-11 07:45:01 +00:00
missedHashes = append ( missedHashes , hash )
w . SendEnvelopeEvent ( common . EnvelopeEvent {
Hash : hash ,
Event : common . EventEnvelopeExpired ,
} )
}
}
w . logger . Debug ( "ack message hashes" , zap . Any ( "ackHashes" , ackHashes ) )
w . logger . Debug ( "missed message hashes" , zap . Any ( "missedHashes" , missedHashes ) )
return append ( ackHashes , missedHashes ... )
}
2024-06-06 13:52:51 +00:00
func ( w * Waku ) Query ( ctx context . Context , peerID peer . ID , query legacy_store . Query , cursor * storepb . Index , opts [ ] legacy_store . HistoryRequestOption , processEnvelopes bool ) ( * storepb . Index , int , error ) {
requestID := protocol . GenerateRequestID ( )
2021-06-16 20:19:45 +00:00
2024-06-06 13:52:51 +00:00
opts = append ( opts ,
legacy_store . WithRequestID ( requestID ) ,
legacy_store . WithPeer ( peerID ) ,
legacy_store . WithCursor ( cursor ) )
2022-11-24 21:09:17 +00:00
2024-06-06 13:52:51 +00:00
logger := w . logger . With ( zap . String ( "requestID" , hexutil . Encode ( requestID ) ) , zap . Stringer ( "peerID" , peerID ) )
2021-11-02 18:27:37 +00:00
2024-06-06 13:52:51 +00:00
logger . Debug ( "store.query" ,
2024-05-21 17:29:29 +00:00
logutils . WakuMessageTimestamp ( "startTime" , query . StartTime ) ,
logutils . WakuMessageTimestamp ( "endTime" , query . EndTime ) ,
2024-01-16 10:38:41 +00:00
zap . Strings ( "contentTopics" , query . ContentTopics ) ,
zap . String ( "pubsubTopic" , query . PubsubTopic ) ,
2024-06-06 13:52:51 +00:00
zap . Stringer ( "cursor" , cursor ) ,
)
2024-01-16 10:38:41 +00:00
2024-05-21 17:29:29 +00:00
queryStart := time . Now ( )
2024-06-06 13:52:51 +00:00
result , err := w . node . LegacyStore ( ) . Query ( ctx , query , opts ... )
2024-05-21 17:29:29 +00:00
queryDuration := time . Since ( queryStart )
2023-01-26 20:31:27 +00:00
if err != nil {
2024-06-06 13:52:51 +00:00
logger . Error ( "error querying storenode" , zap . Error ( err ) )
2024-01-16 10:38:41 +00:00
2023-07-05 15:56:34 +00:00
if w . onHistoricMessagesRequestFailed != nil {
w . onHistoricMessagesRequestFailed ( requestID , peerID , err )
}
2023-12-15 19:50:12 +00:00
return nil , 0 , err
2021-11-02 18:27:37 +00:00
}
2021-06-16 20:19:45 +00:00
2024-06-06 13:52:51 +00:00
logger . Debug ( "store.query response" ,
zap . Duration ( "queryDuration" , queryDuration ) ,
zap . Int ( "numMessages" , len ( result . Messages ) ) ,
zap . Stringer ( "cursor" , result . Cursor ( ) ) )
2023-12-15 19:50:12 +00:00
2021-06-16 20:19:45 +00:00
for _ , msg := range result . Messages {
2022-12-20 17:59:59 +00:00
// Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending
// See https://github.com/vacp2p/rfc/issues/563
msg . RateLimitProof = nil
2024-06-06 13:52:51 +00:00
envelope := protocol . NewEnvelope ( msg , msg . GetTimestamp ( ) , query . PubsubTopic )
logger . Info ( "received waku2 store message" ,
2024-05-15 23:15:00 +00:00
zap . Stringer ( "envelopeHash" , envelope . Hash ( ) ) ,
2024-06-06 13:52:51 +00:00
zap . String ( "pubsubTopic" , query . PubsubTopic ) ,
2023-12-27 13:53:19 +00:00
zap . Int64p ( "timestamp" , envelope . Message ( ) . Timestamp ) ,
)
2023-12-15 19:50:12 +00:00
err = w . OnNewEnvelopes ( envelope , common . StoreMessageType , processEnvelopes )
2021-06-16 20:19:45 +00:00
if err != nil {
2023-12-15 19:50:12 +00:00
return nil , 0 , err
2021-06-16 20:19:45 +00:00
}
}
2024-06-06 13:52:51 +00:00
return result . Cursor ( ) , len ( result . Messages ) , nil
2021-06-16 20:19:45 +00:00
}
// Start implements node.Service, starting the background data propagation thread
// of the Waku protocol.
2021-06-30 11:40:54 +00:00
func ( w * Waku ) Start ( ) error {
2023-11-17 14:31:43 +00:00
if w . ctx == nil {
w . ctx , w . cancel = context . WithCancel ( context . Background ( ) )
}
2023-04-14 16:08:06 +00:00
var err error
2024-02-27 09:24:34 +00:00
if w . node , err = node . New ( w . options ... ) ; err != nil {
2023-04-14 16:08:06 +00:00
return fmt . Errorf ( "failed to create a go-waku node: %v" , err )
}
2024-07-11 18:36:34 +00:00
w . goingOnline = make ( chan struct { } )
2023-04-17 15:45:45 +00:00
2023-11-17 13:11:01 +00:00
if err = w . node . Start ( w . ctx ) ; err != nil {
2023-04-14 16:08:06 +00:00
return fmt . Errorf ( "failed to start go-waku node: %v" , err )
}
2023-08-25 17:02:13 +00:00
w . logger . Info ( "WakuV2 PeerID" , zap . Stringer ( "id" , w . node . Host ( ) . ID ( ) ) )
2024-02-27 09:24:34 +00:00
if err = w . discoverAndConnectPeers ( ) ; err != nil {
2023-04-14 16:08:06 +00:00
return fmt . Errorf ( "failed to add wakuv2 peers: %v" , err )
}
2024-02-27 09:24:34 +00:00
if w . cfg . EnableDiscV5 {
2023-11-17 13:11:01 +00:00
err := w . node . DiscV5 ( ) . Start ( w . ctx )
2023-04-14 16:08:06 +00:00
if err != nil {
return err
}
}
2024-07-09 13:42:34 +00:00
w . wg . Add ( 2 )
2023-04-14 16:08:06 +00:00
go func ( ) {
defer w . wg . Done ( )
2024-06-14 16:35:05 +00:00
2023-04-14 16:08:06 +00:00
for {
select {
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2023-04-14 16:08:06 +00:00
return
2024-07-11 18:36:34 +00:00
case <- w . topicHealthStatusChan :
// TODO: https://github.com/status-im/status-go/issues/4628
case <- w . connectionNotifChan :
isOnline := len ( w . node . Host ( ) . Network ( ) . Peers ( ) ) > 0
2024-06-14 12:41:45 +00:00
if w . cfg . LightClient {
2024-07-11 18:36:34 +00:00
// TODO: Temporary changes for lightNodes to have health check based on connected peers.
//This needs to be enhanced to be based on healthy Filter and lightPush peers available for each shard.
//This would get fixed as part of https://github.com/waku-org/go-waku/issues/1114
subs := w . node . FilterLightnode ( ) . Subscriptions ( )
w . logger . Debug ( "filter subs count" , zap . Int ( "count" , len ( subs ) ) )
//TODO: needs fixing, right now invoking everytime.
//Trigger FilterManager to take care of any pending filter subscriptions
//TODO: Pass pubsubTopic based on topicHealth notif received.
go w . filterManager . onConnectionStatusChange ( w . cfg . DefaultShardPubsubTopic , isOnline )
2024-06-14 16:35:05 +00:00
}
2023-04-14 16:08:06 +00:00
w . connStatusMu . Lock ( )
2024-05-15 23:15:00 +00:00
2024-07-11 18:36:34 +00:00
latestConnStatus := types . ConnStatus {
IsOnline : isOnline ,
Peers : FormatPeerStats ( w . node ) ,
}
2024-01-30 18:13:18 +00:00
w . logger . Debug ( "peer stats" ,
zap . Int ( "peersCount" , len ( latestConnStatus . Peers ) ) ,
zap . Any ( "stats" , latestConnStatus ) )
2023-04-14 16:08:06 +00:00
for k , subs := range w . connStatusSubscriptions {
2024-02-19 23:44:38 +00:00
if ! subs . Send ( latestConnStatus ) {
2023-04-14 16:08:06 +00:00
delete ( w . connStatusSubscriptions , k )
}
}
2024-07-11 18:36:34 +00:00
2023-04-14 16:08:06 +00:00
w . connStatusMu . Unlock ( )
2024-07-11 18:36:34 +00:00
2023-07-05 15:56:34 +00:00
if w . onPeerStats != nil {
w . onPeerStats ( latestConnStatus )
}
2023-04-14 16:08:06 +00:00
2024-07-11 18:36:34 +00:00
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
if ! w . onlineChecker . IsOnline ( ) && isOnline {
if err := w . discoverAndConnectPeers ( ) ; err != nil {
w . logger . Error ( "failed to add wakuv2 peers" , zap . Error ( err ) )
}
}
2024-05-28 17:38:06 +00:00
w . ConnectionChanged ( connection . State {
Offline : ! latestConnStatus . IsOnline ,
} )
2023-04-14 16:08:06 +00:00
}
}
} ( )
go w . telemetryBandwidthStats ( w . cfg . TelemetryServerURL )
2024-06-14 12:41:45 +00:00
//TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented.
2023-04-14 16:08:06 +00:00
go w . runPeerExchangeLoop ( )
2024-07-09 13:42:34 +00:00
if w . cfg . EnableMissingMessageVerification {
w . wg . Add ( 1 )
go w . checkForMissingMessages ( )
}
2023-09-13 10:50:23 +00:00
2024-02-27 09:24:34 +00:00
if w . cfg . LightClient {
2023-09-13 10:50:23 +00:00
// Create FilterManager that will main peer connectivity
// for installed filters
2024-06-14 12:41:45 +00:00
w . filterManager = newFilterManager ( w . ctx , w . logger , w . cfg ,
2023-12-15 19:50:12 +00:00
func ( env * protocol . Envelope ) error { return w . OnNewEnvelopes ( env , common . RelayedMessageType , false ) } ,
2024-06-14 12:41:45 +00:00
w . node . FilterLightnode ( ) )
2023-09-13 10:50:23 +00:00
}
2023-05-22 21:38:02 +00:00
err = w . setupRelaySubscriptions ( )
if err != nil {
return err
}
2023-04-14 16:08:06 +00:00
2021-06-16 20:19:45 +00:00
numCPU := runtime . NumCPU ( )
for i := 0 ; i < numCPU ; i ++ {
2023-12-15 19:50:12 +00:00
go w . processQueueLoop ( )
2021-06-16 20:19:45 +00:00
}
2021-11-18 18:31:28 +00:00
go w . broadcast ( )
2024-01-30 11:45:08 +00:00
2024-06-11 07:45:01 +00:00
go w . checkIfMessagesStored ( )
2024-01-30 11:45:08 +00:00
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
w . wg . Add ( 1 )
2022-12-09 16:16:21 +00:00
go w . seedBootnodesForDiscV5 ( )
2021-11-18 18:31:28 +00:00
2021-06-16 20:19:45 +00:00
return nil
}
2023-05-22 21:38:02 +00:00
func ( w * Waku ) setupRelaySubscriptions ( ) error {
2024-02-27 09:24:34 +00:00
if w . cfg . LightClient {
2023-05-22 21:38:02 +00:00
return nil
}
if w . protectedTopicStore != nil {
protectedTopics , err := w . protectedTopicStore . ProtectedTopics ( )
if err != nil {
return err
}
for _ , pt := range protectedTopics {
// Adding subscription to protected topics
err = w . subscribeToPubsubTopicWithWakuRelay ( pt . Topic , pt . PubKey )
if err != nil {
return err
}
}
}
2024-02-27 09:24:34 +00:00
err := w . subscribeToPubsubTopicWithWakuRelay ( w . cfg . DefaultShardPubsubTopic , nil )
2023-05-22 21:38:02 +00:00
if err != nil {
return err
}
return nil
}
2021-06-16 20:19:45 +00:00
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func ( w * Waku ) Stop ( ) error {
2023-07-17 17:20:55 +00:00
w . cancel ( )
2023-05-22 21:38:02 +00:00
2024-04-17 12:19:03 +00:00
w . envelopeCache . Stop ( )
2021-06-16 20:19:45 +00:00
w . node . Stop ( )
2023-05-22 21:38:02 +00:00
if w . protectedTopicStore != nil {
2024-05-27 13:43:47 +00:00
err := w . protectedTopicStore . Close ( )
2023-05-22 21:38:02 +00:00
if err != nil {
return err
}
}
2024-07-11 18:36:34 +00:00
close ( w . goingOnline )
2021-11-22 15:27:05 +00:00
w . wg . Wait ( )
2023-11-17 14:31:43 +00:00
w . ctx = nil
w . cancel = nil
2021-06-16 20:19:45 +00:00
return nil
}
2023-12-15 19:50:12 +00:00
func ( w * Waku ) OnNewEnvelopes ( envelope * protocol . Envelope , msgType common . MessageType , processImmediately bool ) error {
2023-02-08 14:58:10 +00:00
if envelope == nil {
2023-09-13 10:50:23 +00:00
return nil
2023-02-08 14:58:10 +00:00
}
2023-02-08 12:10:07 +00:00
2021-12-01 15:15:18 +00:00
recvMessage := common . NewReceivedMessage ( envelope , msgType )
2023-02-24 13:32:13 +00:00
if recvMessage == nil {
2023-09-13 10:50:23 +00:00
return nil
2023-02-24 13:32:13 +00:00
}
2023-10-30 14:51:57 +00:00
if w . statusTelemetryClient != nil {
w . statusTelemetryClient . PushReceivedEnvelope ( envelope )
}
2023-11-25 23:24:20 +00:00
logger := w . logger . With (
2024-01-30 18:13:18 +00:00
zap . Any ( "messageType" , msgType ) ,
2024-05-15 23:15:00 +00:00
zap . Stringer ( "envelopeHash" , envelope . Hash ( ) ) ,
2023-11-25 23:24:20 +00:00
zap . String ( "contentTopic" , envelope . Message ( ) . ContentTopic ) ,
2023-12-05 04:29:27 +00:00
zap . Int64 ( "timestamp" , envelope . Message ( ) . GetTimestamp ( ) ) ,
2023-11-25 23:24:20 +00:00
)
2022-11-07 15:25:50 +00:00
logger . Debug ( "received new envelope" )
2021-06-16 20:19:45 +00:00
trouble := false
2023-12-15 19:50:12 +00:00
_ , err := w . add ( recvMessage , processImmediately )
2021-06-16 20:19:45 +00:00
if err != nil {
2022-11-07 15:25:50 +00:00
logger . Info ( "invalid envelope received" , zap . Error ( err ) )
2022-11-04 13:56:45 +00:00
trouble = true
2021-06-16 20:19:45 +00:00
}
common . EnvelopesValidatedCounter . Inc ( )
if trouble {
2023-09-13 10:50:23 +00:00
return errors . New ( "received invalid envelope" )
2021-06-16 20:19:45 +00:00
}
2023-09-13 10:50:23 +00:00
return nil
2021-06-16 20:19:45 +00:00
}
// addEnvelope adds an envelope to the envelope map, used for sending
func ( w * Waku ) addEnvelope ( envelope * common . ReceivedMessage ) {
w . poolMu . Lock ( )
2024-04-17 12:19:03 +00:00
w . envelopeCache . Set ( envelope . Hash ( ) , envelope , ttlcache . DefaultTTL )
2021-06-16 20:19:45 +00:00
w . poolMu . Unlock ( )
}
2023-12-15 19:50:12 +00:00
func ( w * Waku ) add ( recvMessage * common . ReceivedMessage , processImmediately bool ) ( bool , error ) {
2021-06-16 20:19:45 +00:00
common . EnvelopesReceivedCounter . Inc ( )
w . poolMu . Lock ( )
2024-04-17 12:19:03 +00:00
envelope := w . envelopeCache . Get ( recvMessage . Hash ( ) )
alreadyCached := envelope != nil
2021-06-16 20:19:45 +00:00
w . poolMu . Unlock ( )
2024-04-17 12:19:03 +00:00
2021-06-16 20:19:45 +00:00
if ! alreadyCached {
2023-11-02 14:16:59 +00:00
recvMessage . Processed . Store ( false )
2021-06-16 20:19:45 +00:00
w . addEnvelope ( recvMessage )
}
2023-11-02 14:16:59 +00:00
logger := w . logger . With ( zap . String ( "envelopeHash" , recvMessage . Hash ( ) . Hex ( ) ) )
2021-06-16 20:19:45 +00:00
if alreadyCached {
2023-11-02 14:16:59 +00:00
logger . Debug ( "w envelope already cached" )
2021-06-16 20:19:45 +00:00
common . EnvelopesCachedCounter . WithLabelValues ( "hit" ) . Inc ( )
} else {
2023-11-02 14:16:59 +00:00
logger . Debug ( "cached w envelope" )
2021-06-16 20:19:45 +00:00
common . EnvelopesCachedCounter . WithLabelValues ( "miss" ) . Inc ( )
2023-03-08 15:05:46 +00:00
common . EnvelopesSizeMeter . Observe ( float64 ( len ( recvMessage . Envelope . Message ( ) . Payload ) ) )
2023-11-02 14:16:59 +00:00
}
2024-04-17 12:19:03 +00:00
if ! alreadyCached || ! envelope . Value ( ) . Processed . Load ( ) {
2023-12-15 19:50:12 +00:00
if processImmediately {
logger . Debug ( "immediately processing envelope" )
2024-06-11 07:45:01 +00:00
w . processMessage ( recvMessage )
2023-12-15 19:50:12 +00:00
} else {
logger . Debug ( "posting event" )
w . postEvent ( recvMessage ) // notify the local node about the new message
}
2021-06-16 20:19:45 +00:00
}
2023-11-02 14:16:59 +00:00
2021-06-16 20:19:45 +00:00
return true , nil
}
// postEvent queues the message for further processing.
func ( w * Waku ) postEvent ( envelope * common . ReceivedMessage ) {
w . msgQueue <- envelope
}
2023-12-15 19:50:12 +00:00
// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node.
func ( w * Waku ) processQueueLoop ( ) {
2024-01-23 16:56:51 +00:00
if w . ctx == nil {
return
}
2021-06-16 20:19:45 +00:00
for {
select {
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2021-06-16 20:19:45 +00:00
return
case e := <- w . msgQueue :
2024-06-11 07:45:01 +00:00
w . processMessage ( e )
2023-12-15 19:50:12 +00:00
}
}
}
2021-12-01 15:15:18 +00:00
2024-06-11 07:45:01 +00:00
func ( w * Waku ) processMessage ( e * common . ReceivedMessage ) {
2023-12-15 19:50:12 +00:00
logger := w . logger . With (
2024-05-15 23:15:00 +00:00
zap . Stringer ( "envelopeHash" , e . Envelope . Hash ( ) ) ,
2023-12-15 19:50:12 +00:00
zap . String ( "pubsubTopic" , e . PubsubTopic ) ,
zap . String ( "contentTopic" , e . ContentTopic . ContentTopic ( ) ) ,
zap . Int64 ( "timestamp" , e . Envelope . Message ( ) . GetTimestamp ( ) ) ,
)
2021-12-01 15:15:18 +00:00
2023-12-15 19:50:12 +00:00
if e . MsgType == common . StoreMessageType {
// We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously
w . storeMsgIDsMu . Lock ( )
w . storeMsgIDs [ e . Hash ( ) ] = true
w . storeMsgIDsMu . Unlock ( )
}
2021-12-01 15:15:18 +00:00
2024-06-11 07:45:01 +00:00
ephemeral := e . Envelope . Message ( ) . Ephemeral
if e . MsgType == common . SendMessageType && ( ephemeral == nil || ! * ephemeral ) {
w . sendMsgIDsMu . Lock ( )
subMsgs , ok := w . sendMsgIDs [ e . PubsubTopic ]
if ! ok {
subMsgs = make ( map [ gethcommon . Hash ] uint32 )
}
subMsgs [ e . Hash ( ) ] = e . Sent
w . sendMsgIDs [ e . PubsubTopic ] = subMsgs
w . sendMsgIDsMu . Unlock ( )
}
2023-12-15 19:50:12 +00:00
matched := w . filters . NotifyWatchers ( e )
// If not matched we remove it
if ! matched {
logger . Debug ( "filters did not match" )
w . storeMsgIDsMu . Lock ( )
delete ( w . storeMsgIDs , e . Hash ( ) )
w . storeMsgIDsMu . Unlock ( )
} else {
logger . Debug ( "filters did match" )
e . Processed . Store ( true )
2021-06-16 20:19:45 +00:00
}
2023-12-15 19:50:12 +00:00
w . envelopeFeed . Send ( common . EnvelopeEvent {
Topic : e . ContentTopic ,
Hash : e . Hash ( ) ,
Event : common . EventEnvelopeAvailable ,
} )
2021-06-16 20:19:45 +00:00
}
// GetEnvelope retrieves an envelope from the message queue by its hash.
// It returns nil if the envelope can not be found.
func ( w * Waku ) GetEnvelope ( hash gethcommon . Hash ) * common . ReceivedMessage {
w . poolMu . RLock ( )
defer w . poolMu . RUnlock ( )
2024-04-17 12:19:03 +00:00
envelope := w . envelopeCache . Get ( hash )
if envelope == nil {
return nil
}
return envelope . Value ( )
2021-06-16 20:19:45 +00:00
}
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func ( w * Waku ) IsEnvelopeCached ( hash gethcommon . Hash ) bool {
w . poolMu . Lock ( )
defer w . poolMu . Unlock ( )
2024-04-17 12:19:03 +00:00
return w . envelopeCache . Has ( hash )
2021-06-16 20:19:45 +00:00
}
2024-01-30 13:43:34 +00:00
func ( w * Waku ) ClearEnvelopesCache ( ) {
w . poolMu . Lock ( )
defer w . poolMu . Unlock ( )
2024-04-17 12:19:03 +00:00
w . envelopeCache . Stop ( )
w . envelopeCache = newTTLCache ( )
2024-01-30 13:43:34 +00:00
}
2021-08-30 14:57:28 +00:00
func ( w * Waku ) PeerCount ( ) int {
return w . node . PeerCount ( )
}
2022-11-24 15:00:44 +00:00
func ( w * Waku ) Peers ( ) map [ string ] types . WakuV2Peer {
2024-05-15 23:15:00 +00:00
return FormatPeerStats ( w . node )
2021-11-22 13:40:14 +00:00
}
2024-07-01 15:37:54 +00:00
func ( w * Waku ) RelayPeersByTopic ( topic string ) ( * types . PeerList , error ) {
if w . cfg . LightClient {
return nil , errors . New ( "only available in relay mode" )
}
return & types . PeerList {
2024-07-11 18:36:34 +00:00
FullMeshPeers : w . node . Relay ( ) . PubSub ( ) . MeshPeers ( topic ) ,
2024-07-01 15:37:54 +00:00
AllPeers : w . node . Relay ( ) . PubSub ( ) . ListPeers ( topic ) ,
} , nil
}
2022-11-24 21:27:46 +00:00
func ( w * Waku ) ListenAddresses ( ) [ ] string {
addrs := w . node . ListenAddresses ( )
var result [ ] string
for _ , addr := range addrs {
result = append ( result , addr . String ( ) )
}
return result
}
2024-06-18 19:48:49 +00:00
func ( w * Waku ) ENR ( ) ( string , error ) {
enr := w . node . ENR ( )
if enr == nil {
return "" , errors . New ( "enr not available" )
}
return enr . String ( ) , nil
}
2023-05-22 21:38:02 +00:00
func ( w * Waku ) SubscribeToPubsubTopic ( topic string , pubkey * ecdsa . PublicKey ) error {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2023-11-15 15:58:15 +00:00
2024-02-27 09:24:34 +00:00
if ! w . cfg . LightClient {
2023-05-22 21:38:02 +00:00
err := w . subscribeToPubsubTopicWithWakuRelay ( topic , pubkey )
if err != nil {
return err
}
}
return nil
}
2024-01-30 17:56:59 +00:00
func ( w * Waku ) UnsubscribeFromPubsubTopic ( topic string ) error {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2024-01-30 17:56:59 +00:00
2024-02-27 09:24:34 +00:00
if ! w . cfg . LightClient {
2024-01-30 17:56:59 +00:00
err := w . unsubscribeFromPubsubTopicWithWakuRelay ( topic )
if err != nil {
return err
}
}
return nil
}
2023-10-12 19:21:49 +00:00
func ( w * Waku ) RetrievePubsubTopicKey ( topic string ) ( * ecdsa . PrivateKey , error ) {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2023-10-12 19:21:49 +00:00
if w . protectedTopicStore == nil {
return nil , nil
}
return w . protectedTopicStore . FetchPrivateKey ( topic )
}
2023-05-22 21:38:02 +00:00
func ( w * Waku ) StorePubsubTopicKey ( topic string , privKey * ecdsa . PrivateKey ) error {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2023-10-12 19:21:49 +00:00
if w . protectedTopicStore == nil {
return nil
}
2023-05-22 21:38:02 +00:00
return w . protectedTopicStore . Insert ( topic , privKey , & privKey . PublicKey )
}
2024-01-30 17:56:59 +00:00
func ( w * Waku ) RemovePubsubTopicKey ( topic string ) error {
2024-06-06 13:52:51 +00:00
topic = w . GetPubsubTopic ( topic )
2024-01-30 17:56:59 +00:00
if w . protectedTopicStore == nil {
return nil
}
return w . protectedTopicStore . Delete ( topic )
}
2021-11-22 13:40:14 +00:00
func ( w * Waku ) StartDiscV5 ( ) error {
if w . node . DiscV5 ( ) == nil {
return errors . New ( "discv5 is not setup" )
}
2023-07-17 17:20:55 +00:00
return w . node . DiscV5 ( ) . Start ( w . ctx )
2021-11-22 13:40:14 +00:00
}
func ( w * Waku ) StopDiscV5 ( ) error {
if w . node . DiscV5 ( ) == nil {
return errors . New ( "discv5 is not setup" )
}
w . node . DiscV5 ( ) . Stop ( )
return nil
2021-08-30 14:57:28 +00:00
}
2022-12-09 16:16:21 +00:00
func ( w * Waku ) ConnectionChanged ( state connection . State ) {
2024-06-28 13:54:48 +00:00
if ! state . Offline && ! w . onlineChecker . IsOnline ( ) {
2022-12-09 16:16:21 +00:00
select {
2024-07-11 18:36:34 +00:00
case w . goingOnline <- struct { } { } :
2023-01-13 17:12:46 +00:00
default :
w . logger . Warn ( "could not write on connection changed channel" )
2022-12-09 16:16:21 +00:00
}
}
2024-06-28 13:54:48 +00:00
w . onlineChecker . SetOnline ( ! state . Offline )
2022-12-09 16:16:21 +00:00
}
// seedBootnodesForDiscV5 tries to fetch bootnodes
// from an ENR periodically.
// It backs off exponentially until maxRetries, at which point it restarts from 0
// It also restarts if there's a connection change signalled from the client
func ( w * Waku ) seedBootnodesForDiscV5 ( ) {
2024-02-27 09:24:34 +00:00
if ! w . cfg . EnableDiscV5 || w . node . DiscV5 ( ) == nil {
2024-01-30 11:45:08 +00:00
w . wg . Done ( )
2023-02-24 13:29:58 +00:00
return
}
2023-07-03 12:48:21 +00:00
ticker := time . NewTicker ( 500 * time . Millisecond )
2022-12-09 16:16:21 +00:00
defer ticker . Stop ( )
var retries = 0
2023-07-03 12:48:21 +00:00
now := func ( ) int64 {
return time . Now ( ) . UnixNano ( ) / int64 ( time . Millisecond )
}
var lastTry = now ( )
canQuery := func ( ) bool {
backoff := bootnodesQueryBackoffMs * int64 ( math . Exp2 ( float64 ( retries ) ) )
return lastTry + backoff < now ( )
}
2022-12-09 16:16:21 +00:00
for {
select {
case <- ticker . C :
2023-07-03 12:48:21 +00:00
if w . seededBootnodesForDiscV5 && len ( w . node . Host ( ) . Network ( ) . Peers ( ) ) > 3 {
2023-11-10 15:31:59 +00:00
w . logger . Debug ( "not querying bootnodes" , zap . Bool ( "seeded" , w . seededBootnodesForDiscV5 ) , zap . Int ( "peer-count" , len ( w . node . Host ( ) . Network ( ) . Peers ( ) ) ) )
2023-07-03 12:48:21 +00:00
continue
2022-12-09 16:16:21 +00:00
}
2023-07-03 12:48:21 +00:00
if canQuery ( ) {
2023-11-10 15:31:59 +00:00
w . logger . Info ( "querying bootnodes to restore connectivity" , zap . Int ( "peer-count" , len ( w . node . Host ( ) . Network ( ) . Peers ( ) ) ) )
2022-12-09 16:16:21 +00:00
err := w . restartDiscV5 ( )
if err != nil {
w . logger . Warn ( "failed to restart discv5" , zap . Error ( err ) )
}
2023-07-03 12:48:21 +00:00
lastTry = now ( )
2022-12-09 16:16:21 +00:00
retries ++
// We reset the retries after a while and restart
if retries > bootnodesMaxRetries {
retries = 0
}
2023-07-03 12:48:21 +00:00
} else {
2023-11-10 15:31:59 +00:00
w . logger . Info ( "can't query bootnodes" , zap . Int ( "peer-count" , len ( w . node . Host ( ) . Network ( ) . Peers ( ) ) ) , zap . Int64 ( "lastTry" , lastTry ) , zap . Int64 ( "now" , now ( ) ) , zap . Int64 ( "backoff" , bootnodesQueryBackoffMs * int64 ( math . Exp2 ( float64 ( retries ) ) ) ) , zap . Int ( "retries" , retries ) )
2023-07-03 12:48:21 +00:00
2022-12-09 16:16:21 +00:00
}
// If we go online, trigger immediately
2024-07-11 18:36:34 +00:00
case <- w . goingOnline :
2024-05-28 17:38:06 +00:00
if w . cfg . EnableDiscV5 {
if canQuery ( ) {
err := w . restartDiscV5 ( )
if err != nil {
w . logger . Warn ( "failed to restart discv5" , zap . Error ( err ) )
}
2022-12-09 16:16:21 +00:00
2024-05-28 17:38:06 +00:00
}
retries = 0
lastTry = now ( )
2022-12-09 16:16:21 +00:00
}
2024-06-14 12:41:45 +00:00
2023-07-17 17:20:55 +00:00
case <- w . ctx . Done ( ) :
2024-01-30 11:45:08 +00:00
w . wg . Done ( )
w . logger . Debug ( "bootnode seeding stopped" )
2022-12-09 16:16:21 +00:00
return
}
}
}
// Restart discv5, re-retrieving bootstrap nodes
func ( w * Waku ) restartDiscV5 ( ) error {
2023-07-17 17:20:55 +00:00
ctx , cancel := context . WithTimeout ( w . ctx , 30 * time . Second )
2022-12-09 16:16:21 +00:00
defer cancel ( )
bootnodes , err := w . getDiscV5BootstrapNodes ( ctx , w . discV5BootstrapNodes )
if err != nil {
return err
}
if len ( bootnodes ) == 0 {
return errors . New ( "failed to fetch bootnodes" )
}
2023-09-13 10:50:23 +00:00
if w . node . DiscV5 ( ) . ErrOnNotRunning ( ) != nil {
2023-07-03 12:48:21 +00:00
w . logger . Info ( "is not started restarting" )
2024-06-14 16:05:17 +00:00
err := w . node . DiscV5 ( ) . Start ( w . ctx )
2023-07-03 12:48:21 +00:00
if err != nil {
w . logger . Error ( "Could not start DiscV5" , zap . Error ( err ) )
}
} else {
w . node . DiscV5 ( ) . Stop ( )
w . logger . Info ( "is started restarting" )
2023-07-17 17:20:55 +00:00
select {
case <- w . ctx . Done ( ) : // Don't start discv5 if we are stopping waku
return nil
default :
}
2024-06-14 16:05:17 +00:00
err := w . node . DiscV5 ( ) . Start ( w . ctx )
2023-07-03 12:48:21 +00:00
if err != nil {
w . logger . Error ( "Could not start DiscV5" , zap . Error ( err ) )
}
}
2022-12-09 16:16:21 +00:00
w . logger . Info ( "restarting discv5 with nodes" , zap . Any ( "nodes" , bootnodes ) )
return w . node . SetDiscV5Bootnodes ( bootnodes )
}
2023-02-22 21:58:17 +00:00
func ( w * Waku ) AddStorePeer ( address string ) ( peer . ID , error ) {
2021-10-06 16:08:54 +00:00
addr , err := multiaddr . NewMultiaddr ( address )
if err != nil {
return "" , err
}
2024-05-15 23:15:00 +00:00
peerID , err := w . node . AddPeer ( addr , wps . Static , [ ] string { } , legacy_store . StoreID_v20beta4 )
2021-09-10 17:06:06 +00:00
if err != nil {
return "" , err
}
2023-02-22 21:58:17 +00:00
return peerID , nil
2021-08-30 14:57:28 +00:00
}
2022-12-06 15:24:01 +00:00
func ( w * Waku ) timestamp ( ) int64 {
2023-03-24 12:05:42 +00:00
return w . timesource . Now ( ) . UnixNano ( )
2022-12-06 15:24:01 +00:00
}
2023-02-22 21:58:17 +00:00
func ( w * Waku ) AddRelayPeer ( address string ) ( peer . ID , error ) {
2021-10-06 16:08:54 +00:00
addr , err := multiaddr . NewMultiaddr ( address )
if err != nil {
return "" , err
}
2023-09-13 10:50:23 +00:00
peerID , err := w . node . AddPeer ( addr , wps . Static , [ ] string { } , relay . WakuRelayID_v200 )
2021-09-10 17:06:06 +00:00
if err != nil {
return "" , err
}
2023-02-22 21:58:17 +00:00
return peerID , nil
2021-09-10 17:06:06 +00:00
}
func ( w * Waku ) DialPeer ( address string ) error {
2023-07-17 17:20:55 +00:00
ctx , cancel := context . WithTimeout ( w . ctx , requestTimeout )
2021-09-30 23:17:17 +00:00
defer cancel ( )
return w . node . DialPeer ( ctx , address )
2021-09-10 17:06:06 +00:00
}
func ( w * Waku ) DialPeerByID ( peerID string ) error {
2023-07-17 17:20:55 +00:00
ctx , cancel := context . WithTimeout ( w . ctx , requestTimeout )
2021-09-30 23:17:17 +00:00
defer cancel ( )
2023-01-14 13:49:20 +00:00
pid , err := peer . Decode ( peerID )
if err != nil {
return err
}
return w . node . DialPeerByID ( ctx , pid )
2021-08-30 14:57:28 +00:00
}
func ( w * Waku ) DropPeer ( peerID string ) error {
2023-01-14 13:49:20 +00:00
pid , err := peer . Decode ( peerID )
if err != nil {
return err
}
return w . node . ClosePeerById ( pid )
2021-08-30 14:57:28 +00:00
}
2021-12-01 15:15:18 +00:00
func ( w * Waku ) ProcessingP2PMessages ( ) bool {
w . storeMsgIDsMu . Lock ( )
defer w . storeMsgIDsMu . Unlock ( )
return len ( w . storeMsgIDs ) != 0
}
func ( w * Waku ) MarkP2PMessageAsProcessed ( hash gethcommon . Hash ) {
w . storeMsgIDsMu . Lock ( )
defer w . storeMsgIDsMu . Unlock ( )
delete ( w . storeMsgIDs , hash )
}
2023-12-15 16:16:18 +00:00
func ( w * Waku ) Clean ( ) error {
w . msgQueue = make ( chan * common . ReceivedMessage , messageQueueLimit )
for _ , f := range w . filters . All ( ) {
f . Messages = common . NewMemoryMessageStore ( )
}
return nil
}
2024-02-12 12:53:15 +00:00
func ( w * Waku ) PeerID ( ) peer . ID {
return w . node . Host ( ) . ID ( )
}
2024-05-01 00:40:13 +00:00
func ( w * Waku ) Peerstore ( ) peerstore . Peerstore {
return w . node . Host ( ) . Peerstore ( )
}
2021-06-16 20:19:45 +00:00
// validatePrivateKey checks the format of the given private key.
func validatePrivateKey ( k * ecdsa . PrivateKey ) bool {
if k == nil || k . D == nil || k . D . Sign ( ) == 0 {
return false
}
return common . ValidatePublicKey ( & k . PublicKey )
}
// makeDeterministicID generates a deterministic ID, based on a given input
func makeDeterministicID ( input string , keyLen int ) ( id string , err error ) {
buf := pbkdf2 . Key ( [ ] byte ( input ) , nil , 4096 , keyLen , sha256 . New )
if ! common . ValidateDataIntegrity ( buf , common . KeyIDSize ) {
return "" , fmt . Errorf ( "error in GenerateDeterministicID: failed to generate key" )
}
id = gethcommon . Bytes2Hex ( buf )
return id , err
}
// toDeterministicID reviews incoming id, and transforms it to format
// expected internally be private key store. Originally, public keys
// were used as keys, now random keys are being used. And in order to
// make it easier to consume, we now allow both random IDs and public
// keys to be passed.
func toDeterministicID ( id string , expectedLen int ) ( string , error ) {
if len ( id ) != ( expectedLen * 2 ) { // we received hex key, so number of chars in id is doubled
var err error
id , err = makeDeterministicID ( id , expectedLen )
if err != nil {
return "" , err
}
}
return id , nil
}
2021-08-30 14:57:28 +00:00
2024-05-15 23:15:00 +00:00
func FormatPeerStats ( wakuNode * node . WakuNode ) map [ string ] types . WakuV2Peer {
2022-11-24 15:00:44 +00:00
p := make ( map [ string ] types . WakuV2Peer )
2024-05-15 23:15:00 +00:00
for k , v := range wakuNode . PeerStats ( ) {
2022-11-24 15:00:44 +00:00
peerInfo := wakuNode . Host ( ) . Peerstore ( ) . PeerInfo ( k )
wakuV2Peer := types . WakuV2Peer { }
wakuV2Peer . Protocols = v
2024-05-15 23:15:00 +00:00
hostInfo , _ := multiaddr . NewMultiaddr ( fmt . Sprintf ( "/p2p/%s" , k . String ( ) ) )
2022-11-24 15:00:44 +00:00
for _ , addr := range peerInfo . Addrs {
wakuV2Peer . Addresses = append ( wakuV2Peer . Addresses , addr . Encapsulate ( hostInfo ) . String ( ) )
}
2024-02-27 09:24:34 +00:00
p [ k . String ( ) ] = wakuV2Peer
2021-08-30 14:57:28 +00:00
}
return p
}
2024-05-15 23:15:00 +00:00
func ( w * Waku ) StoreNode ( ) legacy_store . Store {
return w . node . LegacyStore ( )
2024-03-08 15:50:46 +00:00
}