diff --git a/examples/basic2/main.go b/examples/basic2/main.go index c45ccfb0..d4837c25 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -53,7 +53,7 @@ func main() { } go writeLoop(ctx, wakuNode) - go readLoop(wakuNode) + go readLoop(ctx, wakuNode) // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) @@ -105,8 +105,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } } -func readLoop(wakuNode *node.WakuNode) { - sub, err := wakuNode.Subscribe(nil) +func readLoop(ctx context.Context, wakuNode *node.WakuNode) { + sub, err := wakuNode.Subscribe(ctx, nil) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 87577c02..1b5f67f0 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -23,7 +23,6 @@ type Chat struct { // Messages is a channel of messages received from other peers in the chat room Messages chan *pb.Chat2Message - sub *node.Subscription C chan *protocol.Envelope node *node.WakuNode @@ -49,20 +48,12 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic if useLightPush { chat.C = make(filter.ContentFilterChan) - - filterRequest := wpb.FilterRequest{ - ContentFilters: []*wpb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, - Topic: string(relay.GetTopic(nil)), - Subscribe: true, - } - - n.SubscribeFilter(ctx, filterRequest, chat.C) + n.SubscribeFilter(ctx, string(relay.GetTopic(nil)), []string{contentTopic}, chat.C) } else { - sub, err := n.Subscribe(nil) + sub, err := n.Subscribe(ctx, nil) if err != nil { return nil, err } - chat.C = sub.C } diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 2d1a54bd..a25f188a 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -84,12 +84,6 @@ func main() { // // Send FilterRequest from light node to full node - filterRequest := pb.FilterRequest{ - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, - Topic: string(pubSubTopic), - Subscribe: true, - } - filterChan := make(filter.ContentFilterChan) go func() { @@ -97,16 +91,15 @@ func main() { log.Info("Light node received msg, ", string(env.Message().Payload)) } }() - lightNode.SubscribeFilter(ctx, filterRequest, filterChan) + lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}, filterChan) go writeLoop(ctx, fullNode) - go readLoop(fullNode) + go readLoop(ctx, fullNode) go func() { // Unsubscribe filter after 5 seconds time.Sleep(5 * time.Second) - filterRequest.Subscribe = false - lightNode.UnsubscribeFilter(ctx, filterRequest) + lightNode.UnsubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}) }() // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) @@ -117,7 +110,6 @@ func main() { // shut the nodes down fullNode.Stop() lightNode.Stop() - } func randomHex(n int) (string, error) { @@ -136,7 +128,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { p.Data = []byte(wakuNode.ID() + ": " + msgContent) p.Key = &node.KeyInfo{Kind: node.None} - payload, err := p.Encode(version) + payload, _ := p.Encode(version) msg := &pb.WakuMessage{ Payload: payload, @@ -145,7 +137,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: timestamp, } - _, err = wakuNode.Publish(ctx, msg, nil) + _, err := wakuNode.Publish(ctx, msg, nil) if err != nil { log.Error("Error sending a message: ", err) } @@ -158,8 +150,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } } -func readLoop(wakuNode *node.WakuNode) { - sub, err := wakuNode.Subscribe(&pubSubTopic) +func readLoop(ctx context.Context, wakuNode *node.WakuNode) { + sub, err := wakuNode.Subscribe(ctx, &pubSubTopic) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/examples/peer_events/build/.gitignore b/examples/peer_events/build/.gitignore new file mode 100644 index 00000000..d6b7ef32 --- /dev/null +++ b/examples/peer_events/build/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/examples/peer_events/main.go b/examples/peer_events/main.go index acbf32e7..1335b4cb 100644 --- a/examples/peer_events/main.go +++ b/examples/peer_events/main.go @@ -211,8 +211,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } } -func readLoop(wakuNode *node.WakuNode) { - sub, err := wakuNode.Subscribe(&pubSubTopic) +func readLoop(ctx context.Context, wakuNode *node.WakuNode) { + sub, err := wakuNode.Subscribe(ctx, &pubSubTopic) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/waku/node.go b/waku/node.go index d1c7c750..9f30d1af 100644 --- a/waku/node.go +++ b/waku/node.go @@ -170,7 +170,7 @@ func Execute(options Options) { for _, t := range options.Relay.Topics { nodeTopic := relay.Topic(t) - _, err := wakuNode.Subscribe(&nodeTopic) + _, err := wakuNode.Subscribe(ctx, &nodeTopic) failOnErr(err, "Error subscring to topic") } diff --git a/waku/options.go b/waku/options.go index 7709db93..7072f195 100644 --- a/waku/options.go +++ b/waku/options.go @@ -17,7 +17,7 @@ type RelayOptions struct { type FilterOptions struct { Enable bool `long:"filter" description:"Enable filter protocol"` - Nodes []string `long:"filter-node" description:"Multiaddr of a peer to request content filtering of messages. Option may be repeated"` + Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"` } // LightpushOptions are settings used to enable the lightpush protocol. This is @@ -28,7 +28,7 @@ type FilterOptions struct { // broadcasted type LightpushOptions struct { Enable bool `long:"lightpush" description:"Enable lightpush protocol"` - Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer to request lightpush of published messages. Option may be repeated"` + Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer that supports lightpush protocol. Option may be repeated"` } // StoreOptions are settings used for enabling the store protocol, used to @@ -37,7 +37,7 @@ type LightpushOptions struct { type StoreOptions struct { Enable bool `long:"store" description:"Enable store protocol"` ShouldResume bool `long:"resume" description:"fix the gap in message history"` - Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"` + Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"` } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e22383de..9d09c817 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -225,7 +225,7 @@ func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) e w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) if shouldRelayMessages { - _, err := w.Subscribe(nil) + _, err := w.Subscribe(w.ctx, nil) if err != nil { return err } @@ -350,7 +350,7 @@ func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { return nil } -func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) { +func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. if node.relay == nil { @@ -407,7 +407,7 @@ func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) { close(subscription.C) subscription.mutex.Unlock() case <-nextMsgTicker.C: - msg, err := sub.Next(node.ctx) + msg, err := sub.Next(ctx) if err != nil { subscription.mutex.Lock() for _, subscription := range node.subscriptions[t] { @@ -437,57 +437,50 @@ func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) { // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters -func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequest, ch filter.ContentFilterChan) error { - // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - // ContentFilterChan takes MessagePush structs - - // Status: Implemented. - - // Sanity check for well-formed subscribe FilterRequest - //doAssert(request.subscribe, "invalid subscribe request") - - log.Info("SubscribeFilter, request: ", request) - - var id string - var err error - +// TODO: what's up with this channel?.......................... is it closed eventually? +func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string, ch filter.ContentFilterChan) error { if node.filter == nil { return errors.New("WakuFilter is not set") } - id, err = node.filter.Subscribe(ctx, request) + // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + // ContentFilterChan takes MessagePush structs + + id, peerID, err := node.filter.Subscribe(ctx, topic, contentTopics) if id == "" || err != nil { // Failed to subscribe - log.Error("remote subscription to filter failed", request) - //waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + log.Error("remote subscription to filter failed") return err } // Register handler for filter, whether remote subscription succeeded or not node.filters[id] = filter.Filter{ - Topic: request.Topic, - ContentFilters: request.ContentFilters, + PeerID: *peerID, + Topic: topic, + ContentFilters: contentTopics, Chan: ch, } return nil } -func (node *WakuNode) UnsubscribeFilter(ctx context.Context, request pb.FilterRequest) { - - log.Info("UnsubscribeFilter, request: ", request) - // Send message to full node in order to unsubscribe - node.filter.Unsubscribe(ctx, request) - +func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) { // Remove local filter var idsToRemove []string for id, f := range node.filters { + if f.Topic != topic { + continue + } + + // Send message to full node in order to unsubscribe + node.filter.Unsubscribe(ctx, topic, contentTopics, f.PeerID) + // Iterate filter entries to remove matching content topics // make sure we delete the content filter // if no more topics are left - for _, cfToDelete := range request.ContentFilters { + for _, cfToDelete := range contentTopics { for i, cf := range f.ContentFilters { - if cf.ContentTopic == cfToDelete.ContentTopic { + if cf == cfToDelete { l := len(f.ContentFilters) - 1 f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] f.ContentFilters = f.ContentFilters[:l] diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 6e43bdb1..08b9bb90 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -25,10 +25,12 @@ type ( ContentFilterChan chan *protocol.Envelope Filter struct { + PeerID peer.ID Topic string - ContentFilters []*pb.FilterRequest_ContentFilter + ContentFilters []string Chan ContentFilterChan } + // @TODO MAYBE MORE INFO? Filters map[string]Filter @@ -69,8 +71,8 @@ func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { // TODO: In case of no topics we should either trigger here for all messages, // or we should not allow such filter to exist in the first place. - for _, contentFilter := range filter.ContentFilters { - if msg.ContentTopic == contentFilter.ContentTopic { + for _, contentTopic := range filter.ContentFilters { + if msg.ContentTopic == contentTopic { filter.Chan <- envelope break } @@ -232,7 +234,18 @@ func (wf *WakuFilter) FilterListener() { // Having a FilterRequest struct, // select a peer with filter support, dial it, // and submit FilterRequest wrapped in FilterRPC -func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (string, error) { //.async, gcsafe.} { +func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (string, *peer.ID, error) { + var contentFilters []*pb.FilterRequest_ContentFilter + for _, ct := range contentTopics { + contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) + } + + request := pb.FilterRequest{ + Subscribe: true, + Topic: topic, + ContentFilters: contentFilters, + } + peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) if err == nil { conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) @@ -249,46 +262,45 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( err = writer.WriteMsg(filterRPC) if err != nil { log.Error("failed to write message", err) - return "", err + return "", nil, err } - return string(id), nil + return string(id), peer, nil } else { - // @TODO more sophisticated error handling here log.Error("failed to connect to remote peer") - //waku_filter_errors.inc(labelValues = [dialFailure]) - return "", err + return "", nil, err } - } else { - log.Info("error selecting peer: ", err) } - return "", nil + log.Info("error selecting peer: ", err) + return "", nil, err } -func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) { - // @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. - peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) - if err == nil { - conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) +func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) { + conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) + if conn != nil { + defer conn.Close() - if conn != nil { - defer conn.Close() + // This is the only successful path to subscription + id := protocol.GenerateRequestId() - // This is the only successful path to subscription - id := protocol.GenerateRequestId() + var contentFilters []*pb.FilterRequest_ContentFilter + for _, ct := range contentTopics { + contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) + } - writer := protoio.NewDelimitedWriter(conn) - filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} - err = writer.WriteMsg(filterRPC) - if err != nil { - log.Error("failed to write message", err) - } - } else { - // @TODO more sophisticated error handling here - log.Error("failed to connect to remote peer", err) - //waku_filter_errors.inc(labelValues = [dialFailure]) + request := pb.FilterRequest{ + Subscribe: false, + Topic: topic, + ContentFilters: contentFilters, + } + + writer := protoio.NewDelimitedWriter(conn) + filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} + err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) } } else { - log.Info("Error selecting peer: ", err) + log.Error("failed to connect to remote peer", err) } }