diff --git a/VERSION b/VERSION index 03e595378..d4fd29afb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.88.4 +0.88.5 diff --git a/go.mod b/go.mod index 38c587fb3..3bb36b5ca 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a + github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index 678b8db7d..cf87b77b1 100644 --- a/go.sum +++ b/go.sum @@ -1102,8 +1102,8 @@ github.com/status-im/go-ethereum v1.10.4-status.2 h1:uvcD2U7skYqPQviARFb4w3wZyFS github.com/status-im/go-ethereum v1.10.4-status.2/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk= github.com/status-im/go-multiaddr-ethv4 v1.2.0/go.mod h1:2VQ3C+9zEurcceasz12gPAtmEzCeyLUGPeKLSXYQKHo= -github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a h1:PQ/S9OaV3I+peTU0YC7q8/AImudIKfuLNDfMzf+w8DY= -github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8= +github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf h1:r4TenmNYnine3l1qGFOQW74s+27M0HlLipQxxZ3PJbI= +github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/node/status_node_services.go b/node/status_node_services.go index 7ae37fc35..bf621238b 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -248,8 +248,12 @@ func (b *StatusNode) wakuV2Service(nodeKey string, wakuCfg *params.WakuV2Config, SoftBlacklistedPeerIDs: wakuCfg.SoftBlacklistedPeerIDs, Host: wakuCfg.Host, Port: wakuCfg.Port, - BootNodes: clusterCfg.WakuNodes, - StoreNodes: clusterCfg.WakuStoreNodes, + LightClient: wakuCfg.LightClient, + KeepAliveInterval: wakuCfg.KeepAliveInterval, + RelayNodes: clusterCfg.RelayNodes, + StoreNodes: clusterCfg.StoreNodes, + FilterNodes: clusterCfg.FilterNodes, + LightpushNodes: clusterCfg.LightpushNodes, } if cfg.Host == "" { diff --git a/params/config.go b/params/config.go index d2399f870..343365971 100644 --- a/params/config.go +++ b/params/config.go @@ -163,7 +163,10 @@ type WakuV2Config struct { // Port number in which to start libp2p protocol (0 for random) Port int - // LightClient should be true if the node should start with an empty bloom filter and not forward messages from other nodes + // Interval of time in seconds to send a ping to peers to keep the connection to them alive + KeepAliveInterval int + + // LightClient should be true if the node will not relay messages and only rely on lightpush/filter nodes LightClient bool // FullNode should be true if waku should always acta as a full node @@ -274,11 +277,17 @@ type ClusterConfig struct { // RendezvousNodes is a list rendezvous discovery nodes. RendezvousNodes []string - // WakuNodes is a list of wakuv2 libp2p nodes - WakuNodes []string + // RelayNodes is a list of wakuv2 relay nodes (libp2p) + RelayNodes []string - // WakuStoreNodes is a list of wakuv2 store nodes - WakuStoreNodes []string + // StoreNodes is a list of wakuv2 store nodes (libp2p) + StoreNodes []string + + // FilterNodes is a list of wakuv2 filter nodes (libp2p) + FilterNodes []string + + // LightpushNodes is a list of wakuv2 lightpush nodes (libp2p) + LightpushNodes []string } // String dumps config object as nicely indented JSON diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index fe537f402..72dd7c906 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -307,11 +307,10 @@ func (w *WakuNode) ID() string { return w.host.ID().Pretty() } -func (w *WakuNode) GetPeerStats() PeerStats { - return w.peers -} - func (w *WakuNode) IsOnline() bool { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + hasRelay := false hasLightPush := false hasStore := false @@ -340,6 +339,9 @@ func (w *WakuNode) IsOnline() bool { } func (w *WakuNode) HasHistory() bool { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + for _, v := range w.peers { for _, protocol := range v { if protocol == string(store.WakuStoreProtocolId) { @@ -721,11 +723,6 @@ func (w *WakuNode) DialPeerByID(peerID peer.ID) error { return w.connect(info) } -func (w *WakuNode) DialPeerByID(peerID peer.ID) error { - info := w.host.Peerstore().PeerInfo(peerID) - return w.connect(info) -} - func (w *WakuNode) ClosePeerByAddress(address string) error { p, err := ma.NewMultiaddr(address) if err != nil { @@ -756,6 +753,8 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error { } func (w *WakuNode) PeerCount() int { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() return len(w.peers) } @@ -771,6 +770,7 @@ func (w *WakuNode) Peers() PeerStats { } func (w *WakuNode) startKeepAlive(t time.Duration) { + log.Info("Setting up ping protocol with duration of ", t) w.ping = ping.NewPingService(w.host) @@ -804,15 +804,16 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { go func(peer peer.ID) { peerFound := false + w.peersMutex.Lock() for p := range w.peers { if p == peer { peerFound = true break } } - - //log.Info("###PING " + s + " before fetching result") - //logwriter.Write([]byte("###PING " + s + " before fetching result")) + defer w.peersMutex.Unlock() + log.Debug("###PING before fetching result") + pingTicker := time.NewTicker(time.Duration(1) * time.Second) isError := false select { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go index 3dbd9c894..13c5eecc8 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go @@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { } } -// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption -// accepts a list of WakuFilter gossipsub options to setup the protocol -func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption { +// WithWakuFilter enables the Waku V2 Filter protocol. +func WithWakuFilter() WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true - params.wOpts = opts return nil } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index c9be11b62..ec8c63a69 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -95,22 +95,28 @@ func (wf *WakuFilter) onRequest(s network.Stream) { return } - log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) stats.Record(wf.ctx, metrics.Messages.M(1)) - if filterRPCRequest.Request != nil { + if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { + // We're on a light node. + // This is a message push coming from a full node. + + log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") + wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) + } else if filterRPCRequest.Request != nil { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { subscriber := Subscriber{peer: string(s.Conn().RemotePeer()), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} wf.subscribers = append(wf.subscribers, subscriber) - log.Info("Full node, add a filter subscriber ", subscriber) + log.Info("filter full node, add a filter subscriber: ", subscriber.peer) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } else { peerId := string(s.Conn().RemotePeer()) - log.Info("Full node, remove a filter subscriber ", peerId) + log.Info("filter full node, remove a filter subscriber: ", peerId) contentFilters := filterRPCRequest.Request.ContentFilters var peerIdsToRemove []string for _, subscriber := range wf.subscribers { @@ -151,20 +157,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) { stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } - } else if filterRPCRequest.Push != nil { - // We're on a light node. - // This is a message push coming from a full node. - - log.Info("Light node, received a message push ", *filterRPCRequest.Push) - wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) } - } func (wf *WakuFilter) peerListener() { for e := range wf.peerChan { if e.Connectedness == network.NotConnected { - log.Info("Filter Notification received ", e.Peer) + log.Info("filter Notification received ", e.Peer) i := 0 // Delete subscribers matching deleted peer for _, s := range wf.subscribers { @@ -174,7 +173,7 @@ func (wf *WakuFilter) peerListener() { } } - log.Info("Filter, deleted subscribers: ", len(wf.subscribers)-i) + log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i) wf.subscribers = wf.subscribers[:i] } } @@ -217,20 +216,22 @@ func (wf *WakuFilter) FilterListener() { for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { - log.Info("Found matching contentTopic ", filter, msg) + log.Info("found matching contentTopic ", filter, msg) msgArr := []*pb.WakuMessage{msg} // Do a message push to light node pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} - log.Info("Pushing a message to light node: ", pushRPC) + log.Info("pushing a message to light node: ", pushRPC) conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId) if err != nil { // @TODO more sophisticated error handling here - log.Error("Failed to open peer stream") + log.Error("failed to open peer stream") //waku_filter_errors.inc(labelValues = [dialFailure]) return err } + + defer conn.Close() writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { @@ -262,12 +263,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId() writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} - log.Info("Sending filterRPC: ", filterRPC) + log.Info("sending filterRPC: ", filterRPC) err = writer.WriteMsg(filterRPC) if err != nil { log.Error("failed to write message", err) @@ -281,7 +284,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( return "", err } } else { - log.Info("Error selecting peer: ", err) + log.Info("error selecting peer: ", err) } return "", nil @@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId() @@ -303,7 +308,6 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) if err != nil { log.Error("failed to write message", err) } - //return some(id) } else { // @TODO more sophisticated error handling here log.Error("failed to connect to remote peer", err) diff --git a/vendor/modules.txt b/vendor/modules.txt index b4139c397..85f7f8d9e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -426,7 +426,7 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.0 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a +# github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf github.com/status-im/go-waku/waku/v2/metrics github.com/status-im/go-waku/waku/v2/node github.com/status-im/go-waku/waku/v2/protocol diff --git a/wakuv2/config.go b/wakuv2/config.go index ebc0d5b04..218da4ffd 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -28,12 +28,17 @@ type Config struct { SoftBlacklistedPeerIDs []string `toml:",omitempty"` Host string `toml:",omitempty"` Port int `toml:",omitempty"` - BootNodes []string `toml:",omitempty"` + KeepAliveInterval int `toml:",omitempty"` + LightClient bool `toml:",omitempty"` + RelayNodes []string `toml:",omitempty"` StoreNodes []string `toml:",omitempty"` + FilterNodes []string `toml:",omitempty"` + LightpushNodes []string `toml:",omitempty"` } var DefaultConfig = Config{ - MaxMessageSize: common.DefaultMaxMessageSize, - Host: "0.0.0.0", - Port: 60000, + MaxMessageSize: common.DefaultMaxMessageSize, + Host: "0.0.0.0", + Port: 60000, + KeepAliveInterval: 1, // second } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index dc12c1a3d..4e4ec8aee 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -47,6 +47,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/metrics" + "github.com/status-im/go-waku/waku/v2/protocol" wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/relay" @@ -63,6 +64,7 @@ import ( const messageQueueLimit = 1024 type settings struct { + LightClient bool // Indicates if the node is a light client MaxMsgSize uint32 // Maximal message length allowed by the waku node EnableConfirmations bool // Enable sending message confirmations SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from @@ -79,7 +81,8 @@ type ConnStatus struct { type Waku struct { node *node.WakuNode // reference to a libp2p waku node - filters *common.Filters // Message filters installed with Subscribe function + filters *common.Filters // Message filters installed with Subscribe function + filterMsgChannel chan *protocol.Envelope // Channel for wakuv2 filter messages privateKeys map[string]*ecdsa.PrivateKey // Private key storage symKeys map[string][]byte // Symmetric key storage @@ -130,10 +133,12 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { waku.settings = settings{ MaxMsgSize: cfg.MaxMessageSize, SoftBlacklistedPeerIDs: make(map[string]bool), + LightClient: cfg.LightClient, } waku.filters = common.NewFilters() waku.bandwidthCounter = metrics.NewBandwidthCounter() + waku.filterMsgChannel = make(chan *protocol.Envelope, 1024) var privateKey *ecdsa.PrivateKey var err error @@ -153,20 +158,35 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { return nil, fmt.Errorf("failed to setup the network interface: %v", err) } - connStatusChan := make(chan node.ConnStatus) + connStatusChan := make(chan node.ConnStatus, 100) - keepAliveInt := 1 - waku.node, err = node.New(context.Background(), + if cfg.KeepAliveInterval == 0 { + cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval + } + + opts := []node.WakuNodeOption{ node.WithLibP2POptions( libp2p.BandwidthReporter(waku.bandwidthCounter), ), node.WithPrivateKey(privateKey), node.WithHostAddress([]net.Addr{hostAddr}), - node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize))), node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages) node.WithConnStatusChan(connStatusChan), - node.WithKeepAlive(time.Duration(keepAliveInt)*time.Second), - ) + node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), + } + + if cfg.LightClient { + opts = append(opts, node.WithLightPush()) + opts = append(opts, node.WithWakuFilter()) + } else { + opts = append(opts, node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize)))) + } + + if waku.node, err = node.New(context.Background(), opts...); err != nil { + return nil, fmt.Errorf("failed to start the go-waku node: %v", err) + } + + waku.addPeers(cfg) go func() { for { @@ -178,36 +198,55 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { } } }() - if err != nil { - fmt.Println(err) - return nil, fmt.Errorf("failed to start the go-waku node: %v", err) - } - for _, bootnode := range cfg.BootNodes { - err := waku.node.DialPeer(bootnode) - if err != nil { - log.Warn("Could not dial peer", err) - } else { - log.Info("Bootnode dialed successfully", bootnode) - } - } - - for _, storenode := range cfg.StoreNodes { - peerID, err := waku.node.AddStorePeer(storenode) - if err != nil { - log.Warn("Could not add store peer", err) - } else { - log.Info("Storepeeer dialed successfully", "peerId", peerID.Pretty()) - } - } - - go waku.runMsgLoop() + go waku.runFilterMsgLoop() + go waku.runRelayMsgLoop() log.Info("setup the go-waku node successfully") return waku, nil } +func (w *Waku) addPeers(cfg *Config) { + if !cfg.LightClient { + for _, relaynode := range cfg.RelayNodes { + err := w.node.DialPeer(relaynode) + if err != nil { + log.Warn("could not dial peer", err) + } else { + log.Info("relay peer dialed successfully", relaynode) + } + } + } + + for _, storenode := range cfg.StoreNodes { + peerID, err := w.node.AddStorePeer(storenode) + if err != nil { + log.Warn("could not add store peer", err) + } else { + log.Info("store peer added successfully", "peerId", peerID.Pretty()) + } + } + + for _, filternode := range cfg.FilterNodes { + peerID, err := w.node.AddFilterPeer(filternode) + if err != nil { + log.Warn("could not add filter peer", err) + } else { + log.Info("filter peer added successfully", "peerId", peerID.Pretty()) + } + } + + for _, lightpushnode := range cfg.LightpushNodes { + peerID, err := w.node.AddLightPushPeer(lightpushnode) + if err != nil { + log.Warn("could not add lightpush peer", err) + } else { + log.Info("lightpush peer added successfully", "peerId", peerID.Pretty()) + } + } +} + func (w *Waku) GetStats() types.StatsSummary { stats := w.bandwidthCounter.GetBandwidthTotals() return types.StatsSummary{ @@ -216,7 +255,11 @@ func (w *Waku) GetStats() types.StatsSummary { } } -func (w *Waku) runMsgLoop() { +func (w *Waku) runRelayMsgLoop() { + if w.settings.LightClient { + return + } + sub, err := w.node.Subscribe(nil) if err != nil { fmt.Println("Could not subscribe:", err) @@ -225,13 +268,49 @@ func (w *Waku) runMsgLoop() { for env := range sub.C { envelopeErrors, err := w.OnNewEnvelopes(env) - // TODO: should these be handled? _ = envelopeErrors _ = err } } +func (w *Waku) runFilterMsgLoop() { + if !w.settings.LightClient { + return + } + + for { + select { + case <-w.quit: + return + case env := <-w.filterMsgChannel: + envelopeErrors, err := w.OnNewEnvelopes(env) + // TODO: should these be handled? + _ = envelopeErrors + _ = err + } + } +} + +func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) { + pubsubTopic := relay.GetTopic(nil) + filterRequest := pb.FilterRequest{ + Topic: string(pubsubTopic), + Subscribe: true, + } + var contentFilters []*pb.FilterRequest_ContentFilter + for _, topic := range topics { + t := &pb.FilterRequest_ContentFilter{ContentTopic: common.BytesToTopic(topic).ContentTopic()} + contentFilters = append(contentFilters, t) + } + filterRequest.ContentFilters = contentFilters + err := w.node.SubscribeFilter(context.Background(), filterRequest, w.filterMsgChannel) + if err != nil { + w.logger.Warn("could not add wakuv2 filter for topics", zap.Any("topics", topics)) + return + } +} + // MaxMessageSize returns the maximum accepted message size. func (w *Waku) MaxMessageSize() uint32 { w.settingsMu.RLock() @@ -527,6 +606,10 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) { return s, err } + if w.settings.LightClient { + w.subscribeWakuFilterTopic(f.Topics) + } + return s, nil } @@ -536,9 +619,25 @@ func (w *Waku) GetFilter(id string) *common.Filter { } // Unsubscribe removes an installed message handler. -// TODO: This does not seem to update the bloom filter, but does update -// the topic interest map func (w *Waku) Unsubscribe(id string) error { + filter := w.filters.Get(id) + if filter != nil && w.settings.LightClient { + pubsubTopic := relay.GetTopic(nil) + filterRequest := pb.FilterRequest{ + Topic: string(pubsubTopic), + Subscribe: true, + } + + var contentFilters []*pb.FilterRequest_ContentFilter + for _, topic := range filter.Topics { + t := &pb.FilterRequest_ContentFilter{ContentTopic: common.BytesToTopic(topic).ContentTopic()} + contentFilters = append(contentFilters, t) + } + filterRequest.ContentFilters = contentFilters + + w.node.UnsubscribeFilter(context.Background(), filterRequest) + } + ok := w.filters.Uninstall(id) if !ok { return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id) @@ -547,8 +646,6 @@ func (w *Waku) Unsubscribe(id string) error { } // Unsubscribe removes an installed message handler. -// TODO: This does not seem to update the bloom filter, but does update -// the topic interest map func (w *Waku) UnsubscribeMany(ids []string) error { for _, id := range ids { w.logger.Debug("cleaning up filter", zap.String("id", id)) @@ -605,6 +702,7 @@ func (w *Waku) Start() error { func (w *Waku) Stop() error { w.node.Stop() close(w.quit) + close(w.filterMsgChannel) return nil }