diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c565d575..a66c4f24 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -48,8 +48,15 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic } if useLightPush { - chat.C = make(filter.ContentFilterChan) - n.SubscribeFilter(ctx, string(relay.GetTopic(nil)), []string{contentTopic}, chat.C) + cf := filter.ContentFilter{ + Topic: string(relay.GetTopic(nil)), + ContentTopics: []string{contentTopic}, + } + var err error + _, chat.C, err = n.SubscribeFilter(ctx, cf) + if err != nil { + return nil, err + } } else { sub, err := n.Subscribe(ctx, nil) if err != nil { diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 1eb26026..bb0d21c3 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -77,7 +77,7 @@ func main() { opts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), - node.WithHostAddress([]net.Addr{hostAddr}), + node.WithHostAddress([]*net.TCPAddr{hostAddr}), node.WithWakuStore(false, true), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), } @@ -87,7 +87,7 @@ func main() { } if *filterFlag { - opts = append(opts, node.WithWakuFilter()) + opts = append(opts, node.WithWakuFilter(false)) } if *lightPushFlag || *lightPushNodeFlag != "" { diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 2d6fe161..8ac23c96 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -54,9 +54,9 @@ func main() { fullNode, err := node.New(ctx, node.WithPrivateKey(prvKey1), - node.WithHostAddress([]net.Addr{hostAddr1}), + node.WithHostAddress([]*net.TCPAddr{hostAddr1}), node.WithWakuRelay(), - node.WithWakuFilter(), + node.WithWakuFilter(true), ) err = fullNode.Start() @@ -66,8 +66,8 @@ func main() { lightNode, err := node.New(ctx, node.WithPrivateKey(prvKey2), - node.WithHostAddress([]net.Addr{hostAddr2}), - node.WithWakuFilter(), + node.WithHostAddress([]*net.TCPAddr{hostAddr2}), + node.WithWakuFilter(false), ) _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1) diff --git a/waku/node.go b/waku/node.go index 013d9e89..f78e1fa3 100644 --- a/waku/node.go +++ b/waku/node.go @@ -139,7 +139,7 @@ func Execute(options Options) { } if options.Filter.Enable { - nodeOpts = append(nodeOpts, node.WithWakuFilter()) + nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode)) } if options.Store.Enable { @@ -178,10 +178,12 @@ func Execute(options Options) { options.Relay.Topics = []string{string(relay.DefaultWakuTopic)} } - for _, t := range options.Relay.Topics { - nodeTopic := relay.Topic(t) - _, err := wakuNode.Subscribe(ctx, &nodeTopic) - failOnErr(err, "Error subscring to topic") + if !options.Relay.Disable { + for _, t := range options.Relay.Topics { + nodeTopic := relay.Topic(t) + _, err := wakuNode.Subscribe(ctx, &nodeTopic) + failOnErr(err, "Error subscring to topic") + } } for _, n := range options.StaticNodes { diff --git a/waku/options.go b/waku/options.go index 630c101a..04f92a49 100644 --- a/waku/options.go +++ b/waku/options.go @@ -16,8 +16,9 @@ type RelayOptions struct { } type FilterOptions struct { - Enable bool `long:"filter" description:"Enable filter protocol"` - Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"` + Enable bool `long:"filter" description:"Enable filter protocol"` + DisableFullNode bool `long:"no-subscribers" description:"Don't accept filter subscribers"` + Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"` } // LightpushOptions are settings used to enable the lightpush protocol. This is diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 573984cc..6c3ad738 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -155,9 +155,11 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) } - err := w.mountRelay(w.opts.enableRelay, w.opts.wOpts...) - if err != nil { - return err + if w.opts.enableRelay { + err := w.mountRelay(w.opts.wOpts...) + if err != nil { + return err + } } if w.opts.enableLightPush { @@ -242,15 +244,16 @@ func (w *WakuNode) Filter() *filter.WakuFilter { return w.filter } -func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) error { +func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) + if err != nil { + return err + } - if shouldRelayMessages { - _, err := w.Subscribe(w.ctx, nil) - if err != nil { - return err - } + _, err = w.Subscribe(w.ctx, nil) + if err != nil { + return err } // TODO: rlnRelay @@ -265,7 +268,7 @@ func (w *WakuNode) mountFilter() error { } } - w.filter = filter.NewWakuFilter(w.ctx, w.host, filterHandler) + w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, filterHandler) return nil } @@ -572,10 +575,6 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi } func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { - if node.relay == nil { - return nil, errors.New("WakuRelay hasn't been set") - } - if message == nil { return nil, errors.New("message can't be null") } @@ -584,6 +583,10 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi return node.LightPush(ctx, message, topic) } + if node.relay == nil { + return nil, errors.New("WakuRelay hasn't been set") + } + hash, err := node.relay.Publish(ctx, message, topic) if err != nil { return nil, err diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index cdb70c56..26d751b1 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -28,9 +28,10 @@ type WakuNodeParameters struct { privKey *crypto.PrivKey libP2POpts []libp2p.Option - enableRelay bool - enableFilter bool - wOpts []pubsub.Option + enableRelay bool + enableFilter bool + isFilterFullNode bool + wOpts []pubsub.Option enableStore bool shouldResume bool @@ -151,9 +152,10 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption { // WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption // accepts a list of WakuFilter gossipsub options to setup the protocol -func WithWakuFilter(opts ...pubsub.Option) WakuNodeOption { +func WithWakuFilter(fullNode bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true + params.isFilterFullNode = fullNode return nil } } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 93ba7872..3d0fbd6d 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -66,6 +66,7 @@ type ( ctx context.Context h host.Host subscribers []Subscriber + isFullNode bool pushHandler MessagePushHandler MsgC chan *protocol.Envelope } @@ -145,7 +146,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) - } else if filterRPCRequest.Request != nil { + } else if filterRPCRequest.Request != nil && wf.isFullNode { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { @@ -197,10 +198,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) { stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } + } else { + log.Error("can't serve request") + return } } -func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler MessagePushHandler) *WakuFilter { ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { log.Error(err) @@ -211,10 +215,17 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl wf.MsgC = make(chan *protocol.Envelope) wf.h = host wf.pushHandler = handler + wf.isFullNode = isFullNode wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) go wf.FilterListener() + if wf.isFullNode { + log.Info("Filter protocol started") + } else { + log.Info("Filter protocol started (only client mode)") + } + return wf } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 6d55546f..8654be72 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -42,6 +42,8 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) if relay != nil { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) log.Info("Light Push protocol started") + } else { + log.Info("Light Push protocol started (only client mode)") } return wakuLP