diff --git a/third_party/nwaku b/third_party/nwaku index c861fa9f7..c5a825e20 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit c861fa9f7560068874570598c81b7a1425a9e931 +Subproject commit c5a825e206c1cb3e6e0cf8c01410527804cb76c4 diff --git a/wakuv2/api.go b/wakuv2/api.go index 1008c6cbb..7dbd13221 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -515,4 +515,4 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { api.mu.Unlock() return id, nil -} */ \ No newline at end of file +} */ diff --git a/wakuv2/history_processor_wrapper.go b/wakuv2/history_processor_wrapper.go index 25cee9e8f..bdb72745c 100644 --- a/wakuv2/history_processor_wrapper.go +++ b/wakuv2/history_processor_wrapper.go @@ -3,7 +3,6 @@ package wakuv2 import ( "github.com/libp2p/go-libp2p/core/peer" - "github.com/status-im/status-go/wakuv2/common" "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/protocol" ) @@ -17,7 +16,9 @@ func NewHistoryProcessorWrapper(waku *Waku) history.HistoryProcessor { } func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error { - return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) + // TODO-nwaku + // return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes) + return nil } func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 93543bc6e..90dce0cae 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,5 +1,6 @@ package wakuv2 +/* TODO-nwaku import ( "errors" @@ -116,3 +117,4 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { }) } } +*/ diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index a1d703798..f121343e8 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -270,7 +270,6 @@ import ( "crypto/ecdsa" "crypto/sha256" "database/sql" - "encoding/hex" "encoding/json" "errors" "fmt" @@ -281,43 +280,47 @@ import ( "time" "unsafe" + "github.com/jellydator/ttlcache/v3" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/multiformats/go-multiaddr" + + "go.uber.org/zap" + + "golang.org/x/crypto/pbkdf2" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/jellydator/ttlcache/v3" + "github.com/libp2p/go-libp2p/core/metrics" - "github.com/libp2p/go-libp2p/core/peer" - peermod "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/multiformats/go-multiaddr" - ma "github.com/multiformats/go-multiaddr" - "github.com/status-im/status-go/connection" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/timesource" - "github.com/status-im/status-go/wakuv2/common" - "github.com/status-im/status-go/wakuv2/persistence" + filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" + "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/api/missing" "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/dnsdisc" - node "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" - storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - "golang.org/x/crypto/pbkdf2" - "golang.org/x/time/rate" + + gocommon "github.com/status-im/status-go/common" + "github.com/status-im/status-go/connection" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/timesource" + "github.com/status-im/status-go/wakuv2/common" + "github.com/status-im/status-go/wakuv2/persistence" + + node "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) const messageQueueLimit = 1024 @@ -328,14 +331,11 @@ const cacheTTL = 20 * time.Minute const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute -const peersToPublishForLightpush = 2 -const publishingLimiterRate = rate.Limit(2) -const publishingLimitBurst = 4 -/* TODO-nwaku +/* TODO-nwaku type SentEnvelope struct { Envelope *protocol.Envelope - PublishMethod PublishMethod + PublishMethod publish.PublishMethod } type ErrorSendingEnvelope struct { @@ -344,13 +344,118 @@ type ErrorSendingEnvelope struct { } type ITelemetryClient interface { - PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope) + SetDeviceType(deviceType string) PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) PushPeerCount(ctx context.Context, peerCount int) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) + PushMessageCheckSuccess(ctx context.Context, messageHash string) + PushMessageCheckFailure(ctx context.Context, messageHash string) + PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) + PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) +} +*/ + +type WakuMessageHash = string +type WakuPubsubTopic = string +type WakuContentTopic = string + +type WakuConfig struct { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NodeKey string `json:"key,omitempty"` + EnableRelay bool `json:"relay"` + LogLevel string `json:"logLevel"` + DnsDiscovery bool `json:"dnsDiscovery,omitempty"` + DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` + MaxMessageSize string `json:"maxMessageSize,omitempty"` + Staticnodes []string `json:"staticnodes,omitempty"` + Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` + Discv5Discovery bool `json:"discv5Discovery,omitempty"` + ClusterID uint16 `json:"clusterId,omitempty"` + Shards []uint16 `json:"shards,omitempty"` } +// Waku represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. +type Waku struct { + wakuCtx unsafe.Pointer + + appDB *sql.DB + + dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery + dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map + dnsDiscAsyncRetrievedSignal chan struct{} + + // Filter-related + filters *common.Filters // Message filters installed with Subscribe function + filterManager *filterapi.FilterManager + + privateKeys map[string]*ecdsa.PrivateKey // Private key storage + symKeys map[string][]byte // Symmetric key storage + keyMu sync.RWMutex // Mutex associated with key stores + + envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node + poolMu sync.RWMutex // Mutex to sync the message and expiration pools + + bandwidthCounter *metrics.BandwidthCounter + + protectedTopicStore *persistence.ProtectedTopicsStore + + sendQueue *publish.MessageQueue + + missingMsgVerifier *missing.MissingMessageVerifier + + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + cfg *WakuConfig + options []node.WakuNodeOption + + envelopeFeed event.Feed + + storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids + storeMsgIDsMu sync.RWMutex + + messageSender *publish.MessageSender + + topicHealthStatusChan chan peermanager.TopicHealthStatus + connectionNotifChan chan node.PeerConnection + connStatusSubscriptions map[string]*types.ConnStatusSubscription + connStatusMu sync.Mutex + onlineChecker *onlinechecker.DefaultOnlineChecker + state connection.State + + StorenodeCycle *history.StorenodeCycle + HistoryRetriever *history.HistoryRetriever + + logger *zap.Logger + + // NTP Synced timesource + timesource *timesource.NTPTimeSource + + // seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery + // bootnodes successfully + seededBootnodesForDiscV5 bool + + // goingOnline is channel that notifies when connectivity has changed from offline to online + goingOnline chan struct{} + + // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery + discV5BootstrapNodes []string + + onHistoricMessagesRequestFailed func([]byte, peer.ID, error) + onPeerStats func(types.ConnStatus) + + // statusTelemetryClient ITelemetryClient // TODO-nwaku + + defaultShardInfo protocol.RelayShards +} + +/* TODO-nwaku func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { w.statusTelemetryClient = client } */ @@ -361,6 +466,193 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { return cache } +// New creates a WakuV2 client ready to communicate through the LibP2P network. +func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *WakuConfig, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { + // Lock the main goroutine to its current OS thread + runtime.LockOSThread() + + WakuSetup() // This should only be called once in the whole app's life + + node, err := wakuNew(nodeKey, + fleet, + cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, + onPeerStats) + if err != nil { + return nil, err + } + + defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() + if err != nil { + fmt.Println("Error happened:", err.Error()) + } + + err = node.WakuRelaySubscribe(defaultPubsubTopic) + if err != nil { + fmt.Println("Error happened:", err.Error()) + } + + node.WakuSetEventCallback() + + return node, nil + + // TODO-nwaku + /* + var err error + if logger == nil { + logger, err = zap.NewDevelopment() + if err != nil { + return nil, err + } + } + + if ts == nil { + ts = timesource.Default() + } + + cfg = setDefaults(cfg) + if err = cfg.Validate(logger); err != nil { + return nil, err + } + + logger.Info("starting wakuv2 with config", zap.Any("config", cfg)) + + ctx, cancel := context.WithCancel(context.Background()) + + waku := &Waku{ + appDB: appDB, + cfg: cfg, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopeCache: newTTLCache(), + msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), + topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), + connectionNotifChan: make(chan node.PeerConnection, 20), + connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), + ctx: ctx, + cancel: cancel, + wg: sync.WaitGroup{}, + dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), + dnsAddressCacheLock: &sync.RWMutex{}, + dnsDiscAsyncRetrievedSignal: make(chan struct{}), + storeMsgIDs: make(map[gethcommon.Hash]bool), + timesource: ts, + storeMsgIDsMu: sync.RWMutex{}, + logger: logger, + discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, + onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, + onPeerStats: onPeerStats, + onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), + sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + } + + waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) + waku.bandwidthCounter = metrics.NewBandwidthCounter() + + if nodeKey == nil { + // No nodekey is provided, create an ephemeral key + nodeKey, err = crypto.GenerateKey() + if err != nil { + return nil, fmt.Errorf("failed to generate a random go-waku private key: %v", err) + } + } + + hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprint(cfg.Host, ":", cfg.Port)) + if err != nil { + return nil, fmt.Errorf("failed to setup the network interface: %v", err) + } + + libp2pOpts := node.DefaultLibP2POptions + libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) + libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) + + opts := []node.WakuNodeOption{ + node.WithLibP2POptions(libp2pOpts...), + node.WithPrivateKey(nodeKey), + node.WithHostAddress(hostAddr), + node.WithConnectionNotification(waku.connectionNotifChan), + node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), + node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval), + node.WithLogger(logger), + node.WithLogLevel(logger.Level()), + node.WithClusterID(cfg.ClusterID), + node.WithMaxMsgSize(1024 * 1024), + } + + if cfg.EnableDiscV5 { + bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes, false) + if err != nil { + logger.Error("failed to get bootstrap nodes", zap.Error(err)) + return nil, err + } + opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate)) + } + shards, err := protocol.TopicsToRelayShards(cfg.DefaultShardPubsubTopic) + if err != nil { + logger.Error("FATAL ERROR: failed to parse relay shards", zap.Error(err)) + return nil, errors.New("failed to parse relay shard, invalid pubsubTopic configuration") + } + if len(shards) == 0 { //Hack so that tests don't fail. TODO: Need to remove this once tests are changed to use proper cluster and shard. + shardInfo := protocol.RelayShards{ClusterID: 0, ShardIDs: []uint16{0}} + shards = append(shards, shardInfo) + } + waku.defaultShardInfo = shards[0] + if cfg.LightClient { + opts = append(opts, node.WithWakuFilterLightNode()) + waku.defaultShardInfo = shards[0] + opts = append(opts, node.WithMaxPeerConnections(cfg.DiscoveryLimit)) + cfg.EnableStoreConfirmationForMessagesSent = false + //TODO: temporary work-around to improve lightClient connectivity, need to be removed once community sharding is implemented + opts = append(opts, node.WithShards(waku.defaultShardInfo.ShardIDs)) + } else { + relayOpts := []pubsub.Option{ + pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)), + } + + if testing.Testing() { + relayOpts = append(relayOpts, pubsub.WithEventTracer(waku)) + } + + opts = append(opts, node.WithWakuRelayAndMinPeers(waku.cfg.MinPeersForRelay, relayOpts...)) + opts = append(opts, node.WithMaxPeerConnections(maxRelayPeers)) + cfg.EnablePeerExchangeClient = true //Enabling this until discv5 issues are resolved. This will enable more peers to be connected for relay mesh. + cfg.EnableStoreConfirmationForMessagesSent = true + } + + if cfg.EnableStore { + if appDB == nil { + return nil, errors.New("appDB is required for store") + } + opts = append(opts, node.WithWakuStore()) + dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second)) + if err != nil { + return nil, err + } + opts = append(opts, node.WithMessageProvider(dbStore)) + } + + if !cfg.LightClient { + opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20))) + opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1))) + } + + if appDB != nil { + waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB) + if err != nil { + return nil, err + } + } + + if cfg.EnablePeerExchangeServer { + opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1))) + } + + waku.options = opts + + waku.logger.Info("setup the go-waku node successfully") + + return waku, nil*/ +} + func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { w.connStatusMu.Lock() defer w.connStatusMu.Unlock() @@ -369,8 +661,8 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription { return subscription } -/* TODO-nwaku -func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { +/* TODO-nwaku +func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string, useOnlyDnsDiscCache bool) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} var result []*enode.Node @@ -395,11 +687,13 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) // Use DNS Discovery wg.Add(1) go func(addr string) { + defer gocommon.LogOnPanic() defer wg.Done() - if err := w.dnsDiscover(ctx, addr, retrieveENR); err != nil { - mu.Lock() - w.seededBootnodesForDiscV5 = false - mu.Unlock() + if err := w.dnsDiscover(ctx, addr, retrieveENR, useOnlyDnsDiscCache); err != nil { + go func() { + defer gocommon.LogOnPanic() + w.retryDnsDiscoveryWithBackoff(ctx, addr, w.dnsDiscAsyncRetrievedSignal) + }() } }(addrString) } else { @@ -408,17 +702,23 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) if err != nil { return nil, err } + mu.Lock() result = append(result, bootnode) + mu.Unlock() } } wg.Wait() + if len(result) == 0 { + w.seededBootnodesForDiscV5 = false + } + return result, nil } type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) -func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { +func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer, useOnlyCache bool) error { w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -427,7 +727,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA defer w.dnsAddressCacheLock.Unlock() discNodes, ok := w.dnsAddressCache[enrtreeAddress] - if !ok { + if !ok && !useOnlyCache { nameserver := w.cfg.Nameserver resolver := w.cfg.Resolver @@ -461,6 +761,36 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA return nil } +func (w *Waku) retryDnsDiscoveryWithBackoff(ctx context.Context, addr string, successChan chan<- struct{}) { + retries := 0 + for { + err := w.dnsDiscover(ctx, addr, func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {}, false) + if err == nil { + select { + case successChan <- struct{}{}: + default: + } + + break + } + + retries++ + backoff := time.Second * time.Duration(math.Exp2(float64(retries))) + if backoff > time.Minute { + backoff = time.Minute + } + + t := time.NewTimer(backoff) + select { + case <-w.ctx.Done(): + t.Stop() + return + case <-t.C: + t.Stop() + } + } +} + func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() @@ -474,7 +804,8 @@ func (w *Waku) discoverAndConnectPeers() { if strings.HasPrefix(addrString, "enrtree://") { // Use DNS Discovery go func() { - if err := w.dnsDiscover(w.ctx, addrString, fnApply); err != nil { + defer gocommon.LogOnPanic() + if err := w.dnsDiscover(w.ctx, addrString, fnApply, false); err != nil { w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString)) } }() @@ -498,15 +829,16 @@ func (w *Waku) discoverAndConnectPeers() { } */ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) { + defer gocommon.LogOnPanic() // Connection will be prunned eventually by the connection manager if needed // The peer connector in go-waku uses Connect, so it will execute identify as part of its addr := peerInfo.Addrs[0] w.WakuConnect(addr.String(), 1000) } -/* TODO-nwaku +/* TODO-nwaku func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { - w.wg.Add(1) + defer gocommon.LogOnPanic() defer w.wg.Done() if telemetryServerURL == "" { @@ -518,20 +850,14 @@ func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { ticker := time.NewTicker(time.Second * 20) defer ticker.Stop() - today := time.Now() - for { select { case <-w.ctx.Done(): return - case now := <-ticker.C: - // Reset totals when day changes - if now.Day() != today.Day() { - today = now - w.bandwidthCounter.Reset() - } - - go telemetry.PushProtocolStats(w.bandwidthCounter.GetBandwidthByProtocol()) + case <-ticker.C: + bandwidthPerProtocol := w.bandwidthCounter.GetBandwidthByProtocol() + w.bandwidthCounter.Reset() + go telemetry.PushProtocolStats(bandwidthPerProtocol) } } } @@ -545,7 +871,7 @@ func (w *Waku) GetStats() types.StatsSummary { } func (w *Waku) runPeerExchangeLoop() { - w.wg.Add(1) + defer gocommon.LogOnPanic() defer w.wg.Done() if !w.cfg.EnablePeerExchangeClient { @@ -583,12 +909,11 @@ func (w *Waku) runPeerExchangeLoop() { w.dnsAddressCacheLock.RUnlock() if len(peers) != 0 { - // TODO - // err := w.node.PeerExchange().Request(w.ctx, w.cfg.DiscoveryLimit, peer_exchange.WithAutomaticPeerSelection(peers...), - // peer_exchange.FilterByShard(int(w.defaultShardInfo.ClusterID), int(w.defaultShardInfo.ShardIDs[0]))) - // if err != nil { - // w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) - // } + err := w.node.PeerExchange().Request(w.ctx, w.cfg.DiscoveryLimit, peer_exchange.WithAutomaticPeerSelection(peers...), + peer_exchange.FilterByShard(int(w.defaultShardInfo.ClusterID), int(w.defaultShardInfo.ShardIDs[0]))) + if err != nil { + w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) + } } } } @@ -602,6 +927,72 @@ func (w *Waku) GetPubsubTopic(topic string) string { return topic } +func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { + topic = w.GetPubsubTopic(topic) + + if !w.node.Relay().IsSubscribed(topic) { + return nil + } + + contentFilter := protocol.NewContentFilter(topic) + + return w.node.Relay().Unsubscribe(w.ctx, contentFilter) +} + +func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { + if w.cfg.LightClient { + return errors.New("only available for full nodes") + } + + topic = w.GetPubsubTopic(topic) + + if w.node.Relay().IsSubscribed(topic) { + return nil + } + + if pubkey != nil { + err := w.node.Relay().AddSignedTopicValidator(topic, pubkey) + if err != nil { + return err + } + } + + contentFilter := protocol.NewContentFilter(topic) + + sub, err := w.node.Relay().Subscribe(w.ctx, contentFilter) + if err != nil { + return err + } + + w.wg.Add(1) + go func() { + defer gocommon.LogOnPanic() + defer w.wg.Done() + for { + select { + case <-w.ctx.Done(): + err := w.node.Relay().Unsubscribe(w.ctx, contentFilter) + if err != nil && !errors.Is(err, context.Canceled) { + w.logger.Error("could not unsubscribe", zap.Error(err)) + } + return + case env := <-sub[0].Ch: + err := w.OnNewEnvelopes(env, common.RelayedMessageType, false) + if err != nil { + w.logger.Error("OnNewEnvelopes error", zap.Error(err)) + } + } + } + }() + + return nil +} + +// MaxMessageSize returns the maximum accepted message size. +func (w *Waku) MaxMessageSize() uint32 { + return w.cfg.MaxMessageSize +} + // CurrentTime returns current time. func (w *Waku) CurrentTime() time.Time { return w.timesource.Now() @@ -870,7 +1261,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { return nil, fmt.Errorf("non-existent key ID") } -/* TODO-nwaku +/* TODO-nwaku // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { @@ -924,104 +1315,10 @@ func (w *Waku) SkipPublishToTopic(value bool) { } func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - return - } - w.messageSentCheck.DeleteByMessageIDs(hashes) + w.messageSender.MessagesDelivered(hashes) } */ -func (w *Waku) SetStorePeerID(peerID peer.ID) { - if w.messageSentCheck != nil { - w.messageSentCheck.SetStorePeerID(peerID) - } -} - -func (w *Waku) Query(ctx context.Context, - peerID peer.ID, - query store.FilterCriteria, - cursor []byte, - opts []store.RequestOption, - processEnvelopes bool) ([]byte, int, error) { - requestID := protocol.GenerateRequestID() - - opts = append(opts, - store.WithRequestID(requestID), - store.WithPeer(peerID), - store.WithCursor(cursor)) - - logger := w.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) - - logger.Debug("store.query", - logutils.WakuMessageTimestamp("startTime", query.TimeStart), - logutils.WakuMessageTimestamp("endTime", query.TimeEnd), - zap.Strings("contentTopics", query.ContentTopics.ToList()), - zap.String("pubsubTopic", query.PubsubTopic), - zap.String("cursor", hexutil.Encode(cursor)), - ) - - // queryStart := time.Now() - - params := new(store.Parameters) - - optList := store.DefaultOptions() - optList = append(optList, opts...) - for _, opt := range optList { - err := opt(params) - if err != nil { - return nil, 0, err - } - } - - storeRequest := &storepb.StoreQueryRequest{ - RequestId: hex.EncodeToString(requestID), - IncludeData: params.IncludeData, - PaginationForward: params.Forward, - PaginationLimit: ¶ms.PageLimit, - } - - jsonStoreRequest, err := json.Marshal(storeRequest) - if err != nil { - return nil, 0, err - } - - result, err := w.wakuStoreQuery(string(jsonStoreRequest), string(peerID), 10000) - - fmt.Println("Store result ", result) - - // result, err := w.node.Store().Query(ctx, query, opts...) - // queryDuration := time.Since(queryStart) - // if err != nil { - // logger.Error("error querying storenode", zap.Error(err)) - - // if w.onHistoricMessagesRequestFailed != nil { - // w.onHistoricMessagesRequestFailed(requestID, peerID, err) - // } - // return nil, 0, err - // } - - // messages := result.Messages() - // envelopesCount := len(messages) - // w.logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) - // for _, mkv := range messages { - // msg := mkv.Message - - // // Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending - // // See https://github.com/vacp2p/rfc/issues/563 - // mkv.Message.RateLimitProof = nil - - // envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), query.PubsubTopic) - - // err = w.OnNewEnvelopes(envelope, common.StoreMessageType, processEnvelopes) - // if err != nil { - // return nil, 0, err - // } - // } - - // return result.Cursor(), envelopesCount, nil - return nil, 0, nil -} - -/* TODO-nwaku +/* TODO-nwaku // OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter. func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { return w.OnNewEnvelopes(env, common.RelayedMessageType, false) @@ -1030,182 +1327,308 @@ func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error { // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. func (w *Waku) Start() error { - // if w.ctx == nil { - // w.ctx, w.cancel = context.WithCancel(context.Background()) - // } - - // var err error - // if w.node, err = node.New(w.options...); err != nil { - // return fmt.Errorf("failed to create a go-waku node: %v", err) - // } - - // w.goingOnline = make(chan struct{}) - err := w.WakuStart() if err != nil { fmt.Println("Error happened:", err.Error()) return err } - // if err = w.node.Start(w.ctx); err != nil { - // return fmt.Errorf("failed to start go-waku node: %v", err) - // } + /* TODO-nwaku + if w.ctx == nil { + w.ctx, w.cancel = context.WithCancel(context.Background()) + } - // w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) + var err error + if w.node, err = node.New(w.options...); err != nil { + return fmt.Errorf("failed to create a go-waku node: %v", err) + } - // w.discoverAndConnectPeers() + w.goingOnline = make(chan struct{}) - // if w.cfg.EnableDiscV5 { - // err := w.node.DiscV5().Start(w.ctx) - // if err != nil { - // return err - // } - // } + if err = w.node.Start(w.ctx); err != nil { + return fmt.Errorf("failed to start go-waku node: %v", err) + } - // w.wg.Add(1) - // go func() { - // defer w.wg.Done() - // ticker := time.NewTicker(5 * time.Second) - // defer ticker.Stop() - // for { - // select { - // case <-w.ctx.Done(): - // return - // case <-ticker.C: - // w.checkForConnectionChanges() - // case <-w.topicHealthStatusChan: - // // TODO: https://github.com/status-im/status-go/issues/4628 - // case <-w.connectionNotifChan: - // w.checkForConnectionChanges() - // } - // } - // }() + w.StorenodeCycle = history.NewStorenodeCycle(w.logger) + w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger) - // go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) + w.StorenodeCycle.Start(w.ctx, w.node.Host()) + + w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) + + w.discoverAndConnectPeers() + + if w.cfg.EnableDiscV5 { + err := w.node.DiscV5().Start(w.ctx) + if err != nil { + return err + } + } + + w.wg.Add(1) + go func() { + defer gocommon.LogOnPanic() + defer w.wg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + w.checkForConnectionChanges() + case <-w.topicHealthStatusChan: + // TODO: https://github.com/status-im/status-go/issues/4628 + case <-w.connectionNotifChan: + w.checkForConnectionChanges() + } + } + }() + + if w.cfg.TelemetryServerURL != "" { + w.wg.Add(1) + go func() { + defer gocommon.LogOnPanic() + defer w.wg.Done() + peerTelemetryTickerInterval := time.Duration(w.cfg.TelemetryPeerCountSendPeriod) * time.Millisecond + if peerTelemetryTickerInterval == 0 { + peerTelemetryTickerInterval = 10 * time.Second + } + peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval) + defer peerTelemetryTicker.Stop() + + for { + select { + case <-w.ctx.Done(): + return + case <-peerTelemetryTicker.C: + w.reportPeerMetrics() + } + } + }() + } + + w.wg.Add(1) + go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. - // go w.runPeerExchangeLoop() - // if w.cfg.EnableMissingMessageVerification { + w.wg.Add(1) + go w.runPeerExchangeLoop() - // w.missingMsgVerifier = missing.NewMissingMessageVerifier( - // w.node.Store(), - // w, - // w.node.Timesource(), - // w.logger) + if w.cfg.EnableMissingMessageVerification { - // w.missingMsgVerifier.Start(w.ctx) + w.missingMsgVerifier = missing.NewMissingMessageVerifier( + missing.NewDefaultStorenodeRequestor(w.node.Store()), + w, + w.node.Timesource(), + w.logger) - // w.wg.Add(1) - // go func() { - // w.wg.Done() - // for { - // select { - // case <-w.ctx.Done(): - // return - // case envelope := <-w.missingMsgVerifier.C: - // err = w.OnNewEnvelopes(envelope, common.MissingMessageType, false) - // if err != nil { - // w.logger.Error("OnNewEnvelopes error", zap.Error(err)) - // } - // } - // } - // }() - // } + w.missingMsgVerifier.Start(w.ctx) - // if w.cfg.LightClient { - // // Create FilterManager that will main peer connectivity - // // for installed filters - // w.filterManager = filterapi.NewFilterManager(w.ctx, w.logger, w.cfg.MinPeersForFilter, - // w, - // w.node.FilterLightnode()) - // } + w.wg.Add(1) + go func() { + defer gocommon.LogOnPanic() + w.wg.Done() + for { + select { + case <-w.ctx.Done(): + return + case envelope := <-w.missingMsgVerifier.C: + err = w.OnNewEnvelopes(envelope, common.MissingMessageType, false) + if err != nil { + w.logger.Error("OnNewEnvelopes error", zap.Error(err)) + } + } + } + }() + } - // err = w.setupRelaySubscriptions() - // if err != nil { - // return err - // } + if w.cfg.LightClient { + // Create FilterManager that will main peer connectivity + // for installed filters + w.filterManager = filterapi.NewFilterManager( + w.ctx, + w.logger, + w.cfg.MinPeersForFilter, + w, + w.node.FilterLightnode(), + filterapi.WithBatchInterval(300*time.Millisecond)) + } - // numCPU := runtime.NumCPU() - // for i := 0; i < numCPU; i++ { - // go w.processQueueLoop() - // } + err = w.setupRelaySubscriptions() + if err != nil { + return err + } + numCPU := runtime.NumCPU() + for i := 0; i < numCPU; i++ { + w.wg.Add(1) + go w.processQueueLoop() + } + */ + + // w.wg.Add(1) + + // TODO-nwaku // go w.broadcast() // go w.sendQueue.Start(w.ctx) - // if w.cfg.EnableStoreConfirmationForMessagesSent { - // w.confirmMessagesSent() - // } + /* TODO-nwaku + err = w.startMessageSender() + if err != nil { + return err + } + */ + /* TODO-nwaku // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` - // go w.seedBootnodesForDiscV5() + w.wg.Add(1) + go w.seedBootnodesForDiscV5() + */ return nil } func (w *Waku) checkForConnectionChanges() { - // isOnline := len(w.node.Host().Network().Peers()) > 0 + /* TODO-nwaku + isOnline := len(w.node.Host().Network().Peers()) > 0 - // w.connStatusMu.Lock() + w.connStatusMu.Lock() - // latestConnStatus := types.ConnStatus{ - // IsOnline: isOnline, - // Peers: FormatPeerStats(w.node), - // } + latestConnStatus := types.ConnStatus{ + IsOnline: isOnline, + Peers: FormatPeerStats(w.node), + } - // w.logger.Debug("peer stats", - // zap.Int("peersCount", len(latestConnStatus.Peers)), - // zap.Any("stats", latestConnStatus)) - // for k, subs := range w.connStatusSubscriptions { - // if !subs.Send(latestConnStatus) { - // delete(w.connStatusSubscriptions, k) - // } - // } + w.logger.Debug("peer stats", + zap.Int("peersCount", len(latestConnStatus.Peers)), + zap.Any("stats", latestConnStatus)) + for k, subs := range w.connStatusSubscriptions { + if !subs.Send(latestConnStatus) { + delete(w.connStatusSubscriptions, k) + } + } - // w.connStatusMu.Unlock() + w.connStatusMu.Unlock() - // if w.onPeerStats != nil { - // w.onPeerStats(latestConnStatus) - // } + if w.onPeerStats != nil { + w.onPeerStats(latestConnStatus) + } - // if w.statusTelemetryClient != nil { - // connFailures := FormatPeerConnFailures(w.node) - // w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) - // w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) - // } - - // w.ConnectionChanged(connection.State{ - // Type: w.state.Type, //setting state type as previous one since there won't be a change here - // Offline: !latestConnStatus.IsOnline, - // }) + w.ConnectionChanged(connection.State{ + Type: w.state.Type, //setting state type as previous one since there won't be a change here + Offline: !latestConnStatus.IsOnline, + }) */ } -// func (w *Waku) confirmMessagesSent() { -// w.messageSentCheck = publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), w.logger) -// go w.messageSentCheck.Start() +/* TODO: nwaku +func (w *Waku) reportPeerMetrics() { + if w.statusTelemetryClient != nil { + connFailures := FormatPeerConnFailures(w.node) + w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) -// go func() { -// for { -// select { -// case <-w.ctx.Done(): -// return -// case hash := <-w.messageSentCheck.MessageStoredChan: -// w.SendEnvelopeEvent(common.EnvelopeEvent{ -// Hash: hash, -// Event: common.EventEnvelopeSent, -// }) -// case hash := <-w.messageSentCheck.MessageExpiredChan: -// w.SendEnvelopeEvent(common.EnvelopeEvent{ -// Hash: hash, -// Event: common.EventEnvelopeExpired, -// }) -// } -// } -// }() -// } + peerCountByOrigin := make(map[wps.Origin]uint) + peerCountByShard := make(map[uint16]uint) + wakuPeerStore := w.node.Host().Peerstore().(wps.WakuPeerstore) + + for _, peerID := range w.node.Host().Network().Peers() { + origin, err := wakuPeerStore.Origin(peerID) + if err != nil { + origin = wps.Unknown + } + + peerCountByOrigin[origin]++ + pubsubTopics, err := wakuPeerStore.PubSubTopics(peerID) + if err != nil { + continue + } + + keys := make([]string, 0, len(pubsubTopics)) + for k := range pubsubTopics { + keys = append(keys, k) + } + relayShards, err := protocol.TopicsToRelayShards(keys...) + if err != nil { + continue + } + + for _, shards := range relayShards { + for _, shard := range shards.ShardIDs { + peerCountByShard[shard]++ + } + } + } + w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard) + w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin) + } +} +*/ + +/* TODO-nwaku +func (w *Waku) startMessageSender() error { + publishMethod := publish.Relay + if w.cfg.LightClient { + publishMethod = publish.LightPush + } + + sender, err := publish.NewMessageSender(publishMethod, publish.NewDefaultPublisher(w.node.Lightpush(), w.node.Relay()), w.logger) + if err != nil { + w.logger.Error("failed to create message sender", zap.Error(err)) + return err + } + + if w.cfg.EnableStoreConfirmationForMessagesSent { + msgStoredChan := make(chan gethcommon.Hash, 1000) + msgExpiredChan := make(chan gethcommon.Hash, 1000) + messageSentCheck := publish.NewMessageSentCheck(w.ctx, publish.NewDefaultStorenodeMessageVerifier(w.node.Store()), w.StorenodeCycle, w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) + sender.WithMessageSentCheck(messageSentCheck) + + w.wg.Add(1) + go func() { + defer gocommon.LogOnPanic() + defer w.wg.Done() + for { + select { + case <-w.ctx.Done(): + return + case hash := <-msgStoredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeSent, + }) + if w.statusTelemetryClient != nil { + w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex()) + } + case hash := <-msgExpiredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeExpired, + }) + if w.statusTelemetryClient != nil { + w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex()) + } + } + } + }() + } + + if !w.cfg.UseThrottledPublish || testing.Testing() { + // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, + // basically disabling the rate limit functionality + limiter := publish.NewPublishRateLimiter(rate.Inf, 1) + sender.WithRateLimiting(limiter) + } + + w.messageSender = sender + w.messageSender.Start() + + return nil +} +*/ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { w.poolMu.Lock() @@ -1213,7 +1636,7 @@ func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { return w.envelopeCache.Has(gethcommon.Hash(mh)), nil } -/* TODO-nwaku +/* TODO-nwaku func (w *Waku) SetTopicsToVerifyForMissingMessages(peerID peer.ID, pubsubTopic string, contentTopics []string) { if !w.cfg.EnableMissingMessageVerification { return @@ -1235,20 +1658,14 @@ func (w *Waku) setupRelaySubscriptions() error { for _, pt := range protectedTopics { // Adding subscription to protected topics - // err = w.subscribeToPubsubTopicWithWakuRelay(pt.Topic, pt.PubKey) - // if err != nil { - // return err - // } - - fmt.Println("Subscription to topic: ", pt.Topic) - err = w.WakuRelaySubscribe(pt.Topic) + err = w.subscribeToPubsubTopicWithWakuRelay(pt.Topic, pt.PubKey) if err != nil { return err } } } - err := w.WakuRelaySubscribe(w.cfg.DefaultShardPubsubTopic) + err := w.subscribeToPubsubTopicWithWakuRelay(w.cfg.DefaultShardPubsubTopic, nil) if err != nil { return err } @@ -1256,7 +1673,37 @@ func (w *Waku) setupRelaySubscriptions() error { return nil } */ -/* TODO-nwaku +// Stop implements node.Service, stopping the background data propagation thread +// of the Waku protocol. +func (w *Waku) Stop() error { + w.cancel() + + w.envelopeCache.Stop() + + err := w.WakuStop() + if err != nil { + return err + } + + /* TODO-nwaku + if w.protectedTopicStore != nil { + err := w.protectedTopicStore.Close() + if err != nil { + return err + } + } + + close(w.goingOnline)*/ + + w.wg.Wait() + + w.ctx = nil + w.cancel = nil + + return nil +} + +/* TODO-nwaku func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil @@ -1267,10 +1714,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } - if w.statusTelemetryClient != nil { - w.statusTelemetryClient.PushReceivedEnvelope(w.ctx, envelope) - } - logger := w.logger.With( zap.String("messageType", msgType), zap.Stringer("envelopeHash", envelope.Hash()), @@ -1296,6 +1739,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } +*/ // addEnvelope adds an envelope to the envelope map, used for sending func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) { @@ -1339,7 +1783,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) } return true, nil -} */ +} // postEvent queues the message for further processing. func (w *Waku) postEvent(envelope *common.ReceivedMessage) { @@ -1349,6 +1793,8 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) { /* TODO-nwaku // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { + defer gocommon.LogOnPanic() + defer w.wg.Done() if w.ctx == nil { return } @@ -1360,7 +1806,7 @@ func (w *Waku) processQueueLoop() { w.processMessage(e) } } -} +}*/ func (w *Waku) processMessage(e *common.ReceivedMessage) { logger := w.logger.With( @@ -1378,11 +1824,6 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } - ephemeral := e.Envelope.Message().Ephemeral - if w.cfg.EnableStoreConfirmationForMessagesSent && e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) { - w.messageSentCheck.Add(e.PubsubTopic, e.Hash(), e.Sent) - } - matched := w.filters.NotifyWatchers(e) // If not matched we remove it @@ -1401,7 +1842,7 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { Hash: e.Hash(), Event: common.EventEnvelopeAvailable, }) -} */ +} // GetEnvelope retrieves an envelope from the message queue by its hash. // It returns nil if the envelope can not be found. @@ -1433,35 +1874,35 @@ func (w *Waku) ClearEnvelopesCache() { w.envelopeCache = newTTLCache() } +// TODO-nwaku func (w *Waku) PeerCount() int { return 0 // return w.node.PeerCount() } +// TODO-nwaku func (w *Waku) Peers() types.PeerStats { return nil // return FormatPeerStats(w.node) } -/* TODO-nwaku +/* TODO-nwaku func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) { if w.cfg.LightClient { return nil, errors.New("only available in relay mode") } - // return &types.PeerList{ - // FullMeshPeers: w.node.Relay().PubSub().MeshPeers(topic), - // AllPeers: w.node.Relay().PubSub().ListPeers(topic), - // }, nil - return nil, nil + return &types.PeerList{ + FullMeshPeers: w.node.Relay().PubSub().MeshPeers(topic), + AllPeers: w.node.Relay().PubSub().ListPeers(topic), + }, nil } func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error { topic = w.GetPubsubTopic(topic) if !w.cfg.LightClient { - err := w.WakuRelaySubscribe(topic) - // err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) + err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) if err != nil { return err } @@ -1473,7 +1914,7 @@ func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { topic = w.GetPubsubTopic(topic) if !w.cfg.LightClient { - err := w.WakuRelayUnsubscribe(topic) + err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic) if err != nil { return err } @@ -1509,19 +1950,22 @@ func (w *Waku) RemovePubsubTopicKey(topic string) error { } */ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { - //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers - // if (state.Offline && len(w.node.Host().Network().Peers()) > 0) || - // (w.state.Type != state.Type && !w.state.Offline && !state.Offline) { // network switched between wifi and cellular - // w.logger.Info("connection switched or offline detected via mobile, disconnecting all peers") - // w.node.DisconnectAllPeers() - // if w.cfg.LightClient { - // w.filterManager.NetworkChange() - // } - // } + // TODO-nwaku + /* + //If connection state is reported by something other than peerCount becoming 0 e.g from mobile app, disconnect all peers + if (state.Offline && len(w.node.Host().Network().Peers()) > 0) || + (w.state.Type != state.Type && !w.state.Offline && !state.Offline) { // network switched between wifi and cellular + w.logger.Info("connection switched or offline detected via mobile, disconnecting all peers") + w.node.DisconnectAllPeers() + if w.cfg.LightClient { + w.filterManager.NetworkChange() + } + } + */ } -/* TODO-nwaku func (w *Waku) ConnectionChanged(state connection.State) { + /* TODO-nwaku isOnline := !state.Offline if w.cfg.LightClient { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 @@ -1543,14 +1987,143 @@ func (w *Waku) ConnectionChanged(state connection.State) { w.onlineChecker.SetOnline(isOnline) } w.state = state -} */ + */ +} + +/* TODO-nwaku +// seedBootnodesForDiscV5 tries to fetch bootnodes +// from an ENR periodically. +// It backs off exponentially until maxRetries, at which point it restarts from 0 +// It also restarts if there's a connection change signalled from the client +func (w *Waku) seedBootnodesForDiscV5() { + defer gocommon.LogOnPanic() + defer w.wg.Done() + + if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil { + return + } + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + var retries = 0 + + now := func() int64 { + return time.Now().UnixNano() / int64(time.Millisecond) + + } + + var lastTry = now() + + canQuery := func() bool { + backoff := bootnodesQueryBackoffMs * int64(math.Exp2(float64(retries))) + + return lastTry+backoff < now() + } + + for { + select { + case <-w.dnsDiscAsyncRetrievedSignal: + if !canQuery() { + continue + } + + err := w.restartDiscV5(true) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + retries = 0 + lastTry = now() + case <-ticker.C: + if w.seededBootnodesForDiscV5 && len(w.node.Host().Network().Peers()) > 3 { + w.logger.Debug("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers()))) + continue + } + + if !canQuery() { + w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())), zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries)) + continue + } + + w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers()))) + err := w.restartDiscV5(false) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + + lastTry = now() + retries++ + // We reset the retries after a while and restart + if retries > bootnodesMaxRetries { + retries = 0 + } + + // If we go online, trigger immediately + case <-w.goingOnline: + if !canQuery() { + continue + } + + err := w.restartDiscV5(false) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + retries = 0 + lastTry = now() + + case <-w.ctx.Done(): + w.logger.Debug("bootnode seeding stopped") + return + } + } +} + +// Restart discv5, re-retrieving bootstrap nodes +func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error { + ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second) + defer cancel() + bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes, useOnlyDNSDiscCache) + if err != nil { + return err + } + if len(bootnodes) == 0 { + return errors.New("failed to fetch bootnodes") + } + + if w.node.DiscV5().ErrOnNotRunning() != nil { + w.logger.Info("is not started restarting") + err := w.node.DiscV5().Start(w.ctx) + if err != nil { + w.logger.Error("Could not start DiscV5", zap.Error(err)) + } + } else { + w.node.DiscV5().Stop() + w.logger.Info("is started restarting") + + select { + case <-w.ctx.Done(): // Don't start discv5 if we are stopping waku + return nil + default: + } + + err := w.node.DiscV5().Start(w.ctx) + if err != nil { + w.logger.Error("Could not start DiscV5", zap.Error(err)) + } + } + + w.logger.Info("restarting discv5 with nodes", zap.Any("nodes", bootnodes)) + return w.node.SetDiscV5Bootnodes(bootnodes) +} +*/ func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) { - // peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) - // if err != nil { - // return "", err - // } - // return peerID, nil + // TODO-nwaku + /* + peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300) + if err != nil { + return "", err + } + return peerID, nil */ return "", nil } @@ -1559,26 +2132,32 @@ func (w *Waku) timestamp() int64 { } func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) { - // peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200) - // if err != nil { - // return "", err - // } - // return peerID, nil + // TODO-nwaku + /* + peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200) + if err != nil { + return "", err + } + return peerID, nil + */ return "", nil } func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { - // ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) - // defer cancel() - // return w.node.DialPeerWithMultiAddress(ctx, address) + // TODO-nwaku + /* + ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) + defer cancel() + return w.node.DialPeerWithMultiAddress(ctx, address) */ return nil } -func (w *Waku) DialPeerByID(peerId peer.ID) error { - return w.WakuDialPeerById(peerId, string(relay.WakuRelayID_v200), 1000) +func (w *Waku) DialPeerByID(peerID peer.ID) error { + return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000) } func (w *Waku) DropPeer(peerID peer.ID) error { + // TODO-nwaku // return w.node.ClosePeerById(peerID) return nil } @@ -1605,11 +2184,13 @@ func (w *Waku) Clean() error { return nil } +// TODO-nwaku func (w *Waku) PeerID() peer.ID { // return w.node.Host().ID() return "" } +// TODO-nwaku func (w *Waku) Peerstore() peerstore.Peerstore { // return w.node.Host().Peerstore() return nil @@ -1661,6 +2242,7 @@ func FormatPeerStats(wakuNode *node.WakuNode) types.PeerStats { return p } +// TODO-nwaku func (w *Waku) StoreNode() *store.WakuStore { // return w.node.Store() return nil @@ -1670,7 +2252,7 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { p := make(map[string]int) for _, peerID := range wakuNode.Host().Network().Peers() { peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) - connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) + connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo.ID) if connFailures > 0 { p[peerID.String()] = connFailures } @@ -1678,109 +2260,12 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { return p } +// TODO-nwaku func (w *Waku) LegacyStoreNode() legacy_store.Store { // return w.node.LegacyStore() return nil } -type WakuMessageHash = string -type WakuPubsubTopic = string -type WakuContentTopic = string - -type WakuConfig struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NodeKey string `json:"key,omitempty"` - EnableRelay bool `json:"relay"` - LogLevel string `json:"logLevel"` - DnsDiscovery bool `json:"dnsDiscovery,omitempty"` - DnsDiscoveryUrl string `json:"dnsDiscoveryUrl,omitempty"` - MaxMessageSize string `json:"maxMessageSize,omitempty"` - Staticnodes []string `json:"staticnodes,omitempty"` - Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` - Discv5Discovery bool `json:"discv5Discovery,omitempty"` - ClusterID uint16 `json:"clusterId,omitempty"` - Shards []uint16 `json:"shards,omitempty"` -} - -type Waku struct { - wakuCtx unsafe.Pointer - - appDB *sql.DB - - dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery - dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map - - // Filter-related - filters *common.Filters // Message filters installed with Subscribe function - filterManager *filterapi.FilterManager - - privateKeys map[string]*ecdsa.PrivateKey // Private key storage - symKeys map[string][]byte // Symmetric key storage - keyMu sync.RWMutex // Mutex associated with key stores - - envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node - poolMu sync.RWMutex // Mutex to sync the message and expiration pools - - bandwidthCounter *metrics.BandwidthCounter - - protectedTopicStore *persistence.ProtectedTopicsStore - - sendQueue *publish.MessageQueue - limiter *publish.PublishRateLimiter - - missingMsgVerifier *missing.MissingMessageVerifier - - msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - cfg *WakuConfig - options []node.WakuNodeOption - - envelopeFeed event.Feed - - storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids - storeMsgIDsMu sync.RWMutex - - messageSentCheck *publish.MessageSentCheck - - topicHealthStatusChan chan peermanager.TopicHealthStatus - connectionNotifChan chan node.PeerConnection - connStatusSubscriptions map[string]*types.ConnStatusSubscription - connStatusMu sync.Mutex - onlineChecker *onlinechecker.DefaultOnlineChecker - state connection.State - - logger *zap.Logger - - // NTP Synced timesource - timesource *timesource.NTPTimeSource - - // seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery - // bootnodes successfully - seededBootnodesForDiscV5 bool - - // goingOnline is channel that notifies when connectivity has changed from offline to online - goingOnline chan struct{} - - // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery - discV5BootstrapNodes []string - - onHistoricMessagesRequestFailed func([]byte, peer.ID, error) - onPeerStats func(types.ConnStatus) - - // statusTelemetryClient ITelemetryClient // TODO-nwaku - - defaultShardInfo protocol.RelayShards -} - -func (w *Waku) Stop() error { - return w.WakuStop() -} - func WakuSetup() { C.waku_setup() } @@ -1813,7 +2298,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, ts = timesource.Default() } - /* TODO-nwaku + /* TODO-nwaku cfg = setDefaults(cfg) if err = cfg.Validate(logger); err != nil { return nil, err @@ -1860,7 +2345,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), - //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku + //sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), // TODO-nwaku }, nil } @@ -1892,7 +2377,6 @@ func (self *Waku) WakuStop() error { errMsg := "error WakuStop: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return errors.New(errMsg) } - func (self *Waku) WakuDestroy() error { var resp = C.allocResp() defer C.freeResp(resp) @@ -2203,7 +2687,7 @@ func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) { listenAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) addrss := strings.Split(listenAddresses, ",") for _, addr := range addrss { - addr, err := ma.NewMultiaddr(addr) + addr, err := multiaddr.NewMultiaddr(addr) if err != nil { return nil, err } @@ -2309,8 +2793,8 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { itemsPeerIds := strings.Split(peersStr, ",") var peers peer.IDSlice - for _, peer := range itemsPeerIds { - id, err := peermod.Decode(peer) + for _, p := range itemsPeerIds { + id, err := peer.Decode(p) if err != nil { errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error() return nil, errors.New(errMsg) @@ -2435,160 +2919,7 @@ func (self *Waku) DisconnectPeerById(peerId peer.ID) error { // } // MaxMessageSize returns the maximum accepted message size. -/* TODO-nwaku +/* TODO-nwaku func (w *Waku) MaxMessageSize() uint32 { return w.cfg.MaxMessageSize } */ - -// New creates a WakuV2 client ready to communicate through the LibP2P network. -func New(nodeKey *ecdsa.PrivateKey, - fleet string, - cfg *WakuConfig, - logger *zap.Logger, - appDB *sql.DB, - ts *timesource.NTPTimeSource, - onHistoricMessagesRequestFailed func([]byte, peer.ID, error), - onPeerStats func(types.ConnStatus)) (*Waku, error) { - - // Lock the main goroutine to its current OS thread - runtime.LockOSThread() - - WakuSetup() // This should only be called once in the whole app's life - - node, err := wakuNew(nodeKey, - fleet, - cfg, logger, appDB, ts, onHistoricMessagesRequestFailed, - onPeerStats) - if err != nil { - return nil, err - } - - defaultPubsubTopic, err := node.WakuDefaultPubsubTopic() - if err != nil { - fmt.Println("Error happened:", err.Error()) - } - - err = node.WakuRelaySubscribe(defaultPubsubTopic) - if err != nil { - fmt.Println("Error happened:", err.Error()) - } - - node.WakuSetEventCallback() - - return node, nil - - // if !cfg.UseThrottledPublish || testing.Testing() { - // // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, - // // basically disabling the rate limit functionality - // waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) - - // } else { - // waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) - // } - - // waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) - // waku.bandwidthCounter = metrics.NewBandwidthCounter() - - // if nodeKey == nil { - // // No nodekey is provided, create an ephemeral key - // nodeKey, err = crypto.GenerateKey() - // if err != nil { - // return nil, fmt.Errorf("failed to generate a random go-waku private key: %v", err) - // } - // } - - // hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprint(cfg.Host, ":", cfg.Port)) - // if err != nil { - // return nil, fmt.Errorf("failed to setup the network interface: %v", err) - // } - - // libp2pOpts := node.DefaultLibP2POptions - // libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter)) - // libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) - - // opts := []node.WakuNodeOption{ - // node.WithLibP2POptions(libp2pOpts...), - // node.WithPrivateKey(nodeKey), - // node.WithHostAddress(hostAddr), - // node.WithConnectionNotification(waku.connectionNotifChan), - // node.WithTopicHealthStatusChannel(waku.topicHealthStatusChan), - // node.WithKeepAlive(randomPeersKeepAliveInterval, allPeersKeepAliveInterval), - // node.WithLogger(logger), - // node.WithLogLevel(logger.Level()), - // node.WithClusterID(cfg.ClusterID), - // node.WithMaxMsgSize(1024 * 1024), - // } - - // if cfg.EnableDiscV5 { - // bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) - // if err != nil { - // logger.Error("failed to get bootstrap nodes", zap.Error(err)) - // return nil, err - // } - // opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate)) - // } - // shards, err := protocol.TopicsToRelayShards(cfg.DefaultShardPubsubTopic) - // if err != nil { - // logger.Error("FATAL ERROR: failed to parse relay shards", zap.Error(err)) - // return nil, errors.New("failed to parse relay shard, invalid pubsubTopic configuration") - // } - // if len(shards) == 0 { //Hack so that tests don't fail. TODO: Need to remove this once tests are changed to use proper cluster and shard. - // shardInfo := protocol.RelayShards{ClusterID: 0, ShardIDs: []uint16{0}} - // shards = append(shards, shardInfo) - // } - // waku.defaultShardInfo = shards[0] - // if cfg.LightClient { - // opts = append(opts, node.WithWakuFilterLightNode()) - // waku.defaultShardInfo = shards[0] - // opts = append(opts, node.WithMaxPeerConnections(cfg.DiscoveryLimit)) - // cfg.EnableStoreConfirmationForMessagesSent = false - // //TODO: temporary work-around to improve lightClient connectivity, need to be removed once community sharding is implemented - // opts = append(opts, node.WithPubSubTopics(cfg.DefaultShardedPubsubTopics)) - // } else { - // relayOpts := []pubsub.Option{ - // pubsub.WithMaxMessageSize(int(waku.cfg.MaxMessageSize)), - // } - - // if waku.logger.Level() == zap.DebugLevel { - // relayOpts = append(relayOpts, pubsub.WithEventTracer(waku)) - // } - - // opts = append(opts, node.WithWakuRelayAndMinPeers(waku.cfg.MinPeersForRelay, relayOpts...)) - // opts = append(opts, node.WithMaxPeerConnections(maxRelayPeers)) - // cfg.EnablePeerExchangeClient = true //Enabling this until discv5 issues are resolved. This will enable more peers to be connected for relay mesh. - // cfg.EnableStoreConfirmationForMessagesSent = true - // } - - // if cfg.EnableStore { - // if appDB == nil { - // return nil, errors.New("appDB is required for store") - // } - // opts = append(opts, node.WithWakuStore()) - // dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second)) - // if err != nil { - // return nil, err - // } - // opts = append(opts, node.WithMessageProvider(dbStore)) - // } - - // if !cfg.LightClient { - // opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20))) - // opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1))) - // } - - // if appDB != nil { - // waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB) - // if err != nil { - // return nil, err - // } - // } - - // if cfg.EnablePeerExchangeServer { - // opts = append(opts, node.WithPeerExchange(peer_exchange.WithRateLimiter(1, 1))) - // } - - // waku.options = opts - // waku.logger.Info("setup the go-waku node successfully") - - // return waku, nil -} diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index 8f2ead7bb..a8a5366d3 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -162,20 +162,20 @@ func TestBasicWakuV2(t *testing.T) { extNodeRestPort := 8646 storeNodeInfo, err := GetNwakuInfo(nil, &extNodeRestPort) require.NoError(t, err) - + nwakuConfig := WakuConfig{ - Port: 30303, - NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", - EnableRelay: true, - LogLevel: "DEBUG", + Port: 30303, + NodeKey: "11d0dcea28e86f81937a3bd1163473c7fbc0a0db54fd72914849bc47bdf78710", + EnableRelay: true, + LogLevel: "DEBUG", DnsDiscoveryUrl: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im", - DnsDiscovery: true, + DnsDiscovery: true, Discv5Discovery: true, - Staticnodes: []string{storeNodeInfo.ListenAddresses[0]}, - ClusterID: 16, - Shards: []uint16{64}, + Staticnodes: []string{storeNodeInfo.ListenAddresses[0]}, + ClusterID: 16, + Shards: []uint16{64}, } - + w, err := New(nil, "", &nwakuConfig, nil, nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, w.Start()) @@ -190,7 +190,7 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - + numConnected, err := w.GetNumConnectedPeers() if err != nil { return err @@ -204,7 +204,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) // Get local store node address - storeNode, err :=peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) + storeNode, err := peer.AddrInfoFromString(storeNodeInfo.ListenAddresses[0]) require.NoError(t, err) require.NoError(t, err) @@ -222,7 +222,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID) require.True(t, isDisconnected, "nwaku should be disconnected from the store node") - + // Re-connect err = w.DialPeerByID(storeNode.ID) require.NoError(t, err) @@ -231,66 +231,66 @@ func TestBasicWakuV2(t *testing.T) { connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300)) require.NoError(t, err) require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node") - - /* - filter := &common.Filter{ - PubsubTopic: config.DefaultShardPubsubTopic, - Messages: common.NewMemoryMessageStore(), - ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), - } - _, err = w.Subscribe(filter) - require.NoError(t, err) - - msgTimestamp := w.timestamp() - contentTopic := maps.Keys(filter.ContentTopics)[0] - - time.Sleep(2 * time.Second) - - _, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: contentTopic.ContentTopic(), - Version: proto.Uint32(0), - Timestamp: &msgTimestamp, - }, nil) - - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - messages := filter.Retrieve() - require.Len(t, messages, 1) - - timestampInSeconds := msgTimestamp / int64(time.Second) - marginInSeconds := 20 - - options = func(b *backoff.ExponentialBackOff) { - b.MaxElapsedTime = 60 * time.Second - b.InitialInterval = 500 * time.Millisecond - } - err = tt.RetryWithBackOff(func() error { - _, envelopeCount, err := w.Query( - context.Background(), - storeNode.PeerID, - store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()), - TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), - TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), - }, - nil, - nil, - false, - ) - if err != nil || envelopeCount == 0 { - // in case of failure extend timestamp margin up to 40secs - if marginInSeconds < 40 { - marginInSeconds += 5 - } - return errors.New("no messages received from store node") + /* + filter := &common.Filter{ + PubsubTopic: config.DefaultShardPubsubTopic, + Messages: common.NewMemoryMessageStore(), + ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), } - return nil - }, options) - require.NoError(t, err) */ + + _, err = w.Subscribe(filter) + require.NoError(t, err) + + msgTimestamp := w.timestamp() + contentTopic := maps.Keys(filter.ContentTopics)[0] + + time.Sleep(2 * time.Second) + + _, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: contentTopic.ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + }, nil) + + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + messages := filter.Retrieve() + require.Len(t, messages, 1) + + timestampInSeconds := msgTimestamp / int64(time.Second) + marginInSeconds := 20 + + options = func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 60 * time.Second + b.InitialInterval = 500 * time.Millisecond + } + err = tt.RetryWithBackOff(func() error { + _, envelopeCount, err := w.Query( + context.Background(), + storeNode.PeerID, + store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(config.DefaultShardPubsubTopic, contentTopic.ContentTopic()), + TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + }, + nil, + nil, + false, + ) + if err != nil || envelopeCount == 0 { + // in case of failure extend timestamp margin up to 40secs + if marginInSeconds < 40 { + marginInSeconds += 5 + } + return errors.New("no messages received from store node") + } + return nil + }, options) + require.NoError(t, err) */ require.NoError(t, w.Stop()) }