diff --git a/examples/filter2/main.go b/examples/filter2/main.go index a25f188a..f2394485 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -84,14 +84,17 @@ func main() { // // Send FilterRequest from light node to full node - filterChan := make(filter.ContentFilterChan) + _, filterChan, err := lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}) + if err != nil { + panic(err) + } go func() { for env := range filterChan { log.Info("Light node received msg, ", string(env.Message().Payload)) } + log.Info("Message channel closed!") }() - lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}, filterChan) go writeLoop(ctx, fullNode) go readLoop(ctx, fullNode) diff --git a/tests/connection_test.go b/tests/connection_test.go index 54531c20..88eb837f 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -40,7 +40,7 @@ func TestBasicSendingReceiving(t *testing.T) { require.NoError(t, write(ctx, wakuNode, "test")) - sub, err := wakuNode.Subscribe(nil) + sub, err := wakuNode.Subscribe(ctx, nil) require.NoError(t, err) value := <-sub.C diff --git a/waku/v2/node/subscription.go b/waku/v2/node/subscription.go index 7f07968f..10a46b52 100644 --- a/waku/v2/node/subscription.go +++ b/waku/v2/node/subscription.go @@ -18,8 +18,11 @@ type Subscription struct { // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { + subs.mutex.Lock() + defer subs.mutex.Unlock() if !subs.closed { close(subs.quit) + subs.closed = true } } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index d4aa3384..4e91bce1 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -46,6 +46,7 @@ type WakuNode struct { lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService ping *ping.PingService + store *store.WakuStore subscriptions map[relay.Topic][]*Subscription subscriptionsMutex sync.Mutex @@ -184,13 +185,30 @@ func (w *WakuNode) Stop() { w.rendezvous.Stop() } - for _, topic := range w.relay.Topics() { - for _, sub := range w.subscriptions[topic] { - sub.Unsubscribe() + if w.relay != nil { + for _, topic := range w.relay.Topics() { + for _, sub := range w.subscriptions[topic] { + sub.Unsubscribe() + } } + w.subscriptions = nil } - w.subscriptions = nil + if w.filter != nil { + w.filter.Stop() + for _, filter := range w.filters { + close(filter.Chan) + } + w.filters = nil + } + + if w.lightPush != nil { + w.lightPush.Stop() + } + + if w.store != nil { + w.store.Stop() + } w.host.Close() } @@ -264,7 +282,8 @@ func (w *WakuNode) mountRendezvous() error { } func (w *WakuNode) startStore() { - w.opts.store.Start(w.ctx, w.host) + w.store = w.opts.store + w.store.Start(w.ctx, w.host) if w.opts.shouldResume { // TODO: extract this to a function and run it when you go offline @@ -315,7 +334,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer. } func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { - if w.opts.store == nil { + if w.store == nil { return nil, errors.New("WakuStore is not set") } @@ -328,7 +347,7 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime query.StartTime = startTime query.EndTime = endTime query.PagingInfo = new(pb.PagingInfo) - result, err := w.opts.store.Query(ctx, query, opts...) + result, err := w.store.Query(ctx, query, opts...) if err != nil { return nil, err } @@ -336,11 +355,11 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime } func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { - if w.opts.store == nil { + if w.store == nil { return errors.New("WakuStore is not set") } - result, err := w.opts.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList) + result, err := w.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList) if err != nil { return err } @@ -439,34 +458,62 @@ func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters -// TODO: what's up with this channel?.......................... is it closed eventually? -func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string, ch filter.ContentFilterChan) error { +func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string) (filterID string, ch chan *protocol.Envelope, err error) { if node.filter == nil { - return errors.New("WakuFilter is not set") + err = errors.New("WakuFilter is not set") + return } + // TODO: should be possible to pass the peerID as option or autoselect peer. + // TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID + // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // ContentFilterChan takes MessagePush structs - - id, peerID, err := node.filter.Subscribe(ctx, topic, contentTopics) - if id == "" || err != nil { + var peerID *peer.ID + filterID, peerID, err = node.filter.Subscribe(ctx, topic, contentTopics) + if filterID == "" || err != nil { // Failed to subscribe log.Error("remote subscription to filter failed") - return err + return } + ch = make(chan *protocol.Envelope, 1024) // To avoid blocking + // Register handler for filter, whether remote subscription succeeded or not - node.filters[id] = filter.Filter{ + node.filters[filterID] = filter.Filter{ PeerID: *peerID, Topic: topic, ContentFilters: contentTopics, Chan: ch, } + return +} + +// UnsubscribeFilterByID removes a subscription to a filter node completely +// using the filterID returned when the subscription was created +func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error { + + var f filter.Filter + var ok bool + if f, ok = node.filters[filterID]; !ok { + return errors.New("filter not found") + } + + err := node.filter.Unsubscribe(ctx, f.Topic, f.ContentFilters, f.PeerID) + if err != nil { + return err + } + + close(f.Chan) + delete(node.filters, filterID) + return nil } -func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) { +// Unsubscribe filter removes content topics from a filter subscription. If all +// the contentTopics are removed the subscription is dropped completely +func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) error { // Remove local filter var idsToRemove []string for id, f := range node.filters { @@ -499,11 +546,14 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, conte for _, rId := range idsToRemove { for id := range node.filters { if id == rId { + close(node.filters[id].Chan) delete(node.filters, id) break } } } + + return nil } func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 08b9bb90..b523e5d2 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -22,13 +22,11 @@ import ( var log = logging.Logger("wakufilter") type ( - ContentFilterChan chan *protocol.Envelope - Filter struct { PeerID peer.ID Topic string ContentFilters []string - Chan ContentFilterChan + Chan chan *protocol.Envelope } // @TODO MAYBE MORE INFO? @@ -234,7 +232,7 @@ func (wf *WakuFilter) FilterListener() { // Having a FilterRequest struct, // select a peer with filter support, dial it, // and submit FilterRequest wrapped in FilterRPC -func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (string, *peer.ID, error) { +func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (requestID string, peer *peer.ID, err error) { var contentFilters []*pb.FilterRequest_ContentFilter for _, ct := range contentTopics { contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) @@ -246,61 +244,67 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics ContentFilters: contentFilters, } - peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) - if err == nil { - conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) - - if conn != nil { - defer conn.Close() - - // This is the only successful path to subscription - id := protocol.GenerateRequestId() - - writer := protoio.NewDelimitedWriter(conn) - filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} - log.Info("sending filterRPC: ", filterRPC) - err = writer.WriteMsg(filterRPC) - if err != nil { - log.Error("failed to write message", err) - return "", nil, err - } - return string(id), peer, nil - } else { - log.Error("failed to connect to remote peer") - return "", nil, err - } + peer, err = utils.SelectPeer(wf.h, string(FilterID_v20beta1)) + if err != nil { + return } - log.Info("error selecting peer: ", err) - return "", nil, err + var conn network.Stream + conn, err = wf.h.NewStream(ctx, *peer, FilterID_v20beta1) + if err != nil { + return + } + + defer conn.Close() + + // This is the only successful path to subscription + requestID = hex.EncodeToString(protocol.GenerateRequestId()) + + writer := protoio.NewDelimitedWriter(conn) + filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request} + log.Info("sending filterRPC: ", filterRPC) + err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) + return + } + + return } -func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) { +func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) error { conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) - if conn != nil { - defer conn.Close() - // This is the only successful path to subscription - id := protocol.GenerateRequestId() - - var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range contentTopics { - contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) - } - - request := pb.FilterRequest{ - Subscribe: false, - Topic: topic, - ContentFilters: contentFilters, - } - - writer := protoio.NewDelimitedWriter(conn) - filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} - err = writer.WriteMsg(filterRPC) - if err != nil { - log.Error("failed to write message", err) - } - } else { - log.Error("failed to connect to remote peer", err) + if err != nil { + return err } + + defer conn.Close() + + // This is the only successful path to subscription + id := protocol.GenerateRequestId() + + var contentFilters []*pb.FilterRequest_ContentFilter + for _, ct := range contentTopics { + contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) + } + + request := pb.FilterRequest{ + Subscribe: false, + Topic: topic, + ContentFilters: contentFilters, + } + + writer := protoio.NewDelimitedWriter(conn) + filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} + err = writer.WriteMsg(filterRPC) + if err != nil { + return err + } + + return nil +} + +func (wf *WakuFilter) Stop() { + wf.h.RemoveStreamHandler(FilterID_v20beta1) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 71bca63b..f46f6746 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -203,3 +203,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o return pushResponseRPC.Response, nil } + +func (w *WakuLightPush) Stop() { + w.h.RemoveStreamHandler(LightPushID_v20beta1) +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index c6c1e225..b70cf3f2 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -175,3 +175,7 @@ func GetTopic(topic *Topic) Topic { } return t } + +func (w *WakuRelay) Stop() { + w.host.RemoveStreamHandler(WakuRelayID_v200) +} diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 5a2d63c0..45560d25 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -622,3 +622,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } // TODO: queryWithAccounting + +func (w *WakuStore) Stop() { + w.h.RemoveStreamHandler(StoreID_v20beta3) +}