diff --git a/cmd/waku/node.go b/cmd/waku/node.go index fab591f7..57387dd1 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -382,15 +382,13 @@ func Execute(options NodeOptions) error { var wg sync.WaitGroup if options.Relay.Enable { - for nodeTopic := range pubSubTopicMap { + for nodeTopic, cTopics := range pubSubTopicMap { nodeTopic := nodeTopic - sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) + _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer()) if err != nil { return err } - sub.Unsubscribe() - if len(options.Rendezvous.Nodes) != 0 { // Register the node in rendezvous point iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes) @@ -549,17 +547,18 @@ func Execute(options NodeOptions) error { return nil } -func processTopics(options NodeOptions) (map[string]struct{}, error) { +func processTopics(options NodeOptions) (map[string][]string, error) { + //Using a map to avoid duplicate pub-sub topics that can result from autosharding // or same-topic being passed twice. - pubSubTopicMap := make(map[string]struct{}) + pubSubTopicMap := make(map[string][]string) for _, topic := range options.Relay.Topics.Value() { - pubSubTopicMap[topic] = struct{}{} + pubSubTopicMap[topic] = []string{} } for _, topic := range options.Relay.PubSubTopics.Value() { - pubSubTopicMap[topic] = struct{}{} + pubSubTopicMap[topic] = []string{} } //Get pubSub topics from contentTopics if they are as per autosharding @@ -569,11 +568,14 @@ func processTopics(options NodeOptions) (map[string]struct{}, error) { return nil, err } pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount) - pubSubTopicMap[pTopic.String()] = struct{}{} + if _, ok := pubSubTopicMap[pTopic.String()]; !ok { + pubSubTopicMap[pTopic.String()] = []string{} + } + pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopic) } //If no topics are passed, then use default waku topic. if len(pubSubTopicMap) == 0 { - pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{} + pubSubTopicMap[relay.DefaultWakuTopic] = []string{} } return pubSubTopicMap, nil diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index f466d2d3..af4f5d41 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -107,7 +107,7 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re var err error for _, topic := range topics { - err = r.node.Relay().Unsubscribe(req.Context(), topic) + err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) } else { @@ -129,18 +129,20 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ var err error var sub *relay.Subscription + var subs []*relay.Subscription var topicToSubscribe string for _, topic := range topics { if topic == "" { - sub, err = r.node.Relay().Subscribe(req.Context()) + subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic)) topicToSubscribe = relay.DefaultWakuTopic } else { - sub, err = r.node.Relay().SubscribeToTopic(req.Context(), topic) + subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic)) topicToSubscribe = topic } if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) } else { + sub = subs[0] sub.Unsubscribe() r.messagesMutex.Lock() r.messages[topic] = []*pb.WakuMessage{} diff --git a/cmd/waku/server/rest/runner.go b/cmd/waku/server/rest/runner.go index 7681dd2d..c1c4d310 100644 --- a/cmd/waku/server/rest/runner.go +++ b/cmd/waku/server/rest/runner.go @@ -11,7 +11,7 @@ type Adder func(msg *protocol.Envelope) type runnerService struct { broadcaster relay.Broadcaster - sub relay.Subscription + sub *relay.Subscription cancel context.CancelFunc adder Adder } @@ -26,7 +26,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService func (r *runnerService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel - r.sub = r.broadcaster.RegisterForAll(1024) + r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize)) for { select { case <-ctx.Done(): diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index 4d2b7c11..8986de53 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -13,6 +13,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -37,9 +38,9 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService { require.NoError(t, err) if isFullNode { - sub, err := n.Relay().SubscribeToTopic(context.Background(), testTopic) + sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic)) go func() { - for range sub.Ch { + for range sub[0].Ch { } }() require.NoError(t, err) @@ -62,14 +63,15 @@ func TestFilterSubscription(t *testing.T) { err = node.Start(context.Background()) require.NoError(t, err) - _, err = node.SubscribeToTopic(context.Background(), testTopic) + _, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic)) require.NoError(t, err) b2 := relay.NewBroadcaster(10) require.NoError(t, b2.Start(context.Background())) f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) f.SetHost(host) - err = f.Start(context.Background(), relay.NoopSubscription()) + sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + err = f.Start(context.Background(), sub) require.NoError(t, err) d := makeFilterService(t, true) diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index a5a6927f..ee0679fd 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -33,6 +33,11 @@ type RelayMessageArgs struct { Message *RPCWakuMessage `json:"message,omitempty"` } +// RelayAutoMessageArgs represents the requests used for posting messages +type RelayAutoMessageArgs struct { + Message *RPCWakuMessage `json:"message,omitempty"` +} + // TopicsArgs represents the lists of topics to use when subscribing / unsubscribing type TopicsArgs struct { Topics []string `json:"topics,omitempty"` @@ -120,28 +125,97 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, return nil } +// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription +// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. +func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { + + _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...)) + if err != nil { + r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err)) + return err + } + //TODO: Handle partial errors. + + *reply = true + return nil +} + +// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription +// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. +func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { + ctx := req.Context() + + err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...)) + if err != nil { + r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err)) + return err + } + //TODO: Handle partial errors. + *reply = true + return nil +} + +// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message +func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error { + var err error + msg := args.Message.toProto() + if msg == nil { + err := fmt.Errorf("invalid message format received") + r.log.Error("publishing message", zap.Error(err)) + return err + } + if msg.ContentTopic == "" { + err := fmt.Errorf("content-topic cannot be empty") + r.log.Error("publishing message", zap.Error(err)) + return err + } + if err = server.AppendRLNProof(r.node, msg); err != nil { + return err + } + + _, err = r.node.Relay().Publish(req.Context(), msg) + if err != nil { + r.log.Error("publishing message", zap.Error(err)) + return err + } + + *reply = true + return nil +} + +// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method +// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding. +func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { + sub, err := r.node.Relay().GetSubscription(args.Topic) + if err != nil { + return err + } + select { + case msg := <-sub.Ch: + *reply = append(*reply, ProtoToRPC(msg.Message())) + default: + break + } + return nil +} + // PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() + for _, topic := range args.Topics { var err error if topic == "" { - var sub *relay.Subscription - sub, err = r.node.Relay().Subscribe(ctx) - sub.Unsubscribe() - } else { - var sub *relay.Subscription - sub, err = r.node.Relay().SubscribeToTopic(ctx, topic) - if err != nil { - r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) - return err - } - sub.Unsubscribe() + topic = relay.DefaultWakuTopic } + var sub *relay.Subscription + subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err } + sub = subs[0] + sub.Unsubscribe() r.messagesMutex.Lock() r.messages[topic] = make([]*pb.WakuMessage, 0) r.messagesMutex.Unlock() @@ -155,7 +229,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { - err := r.node.Relay().Unsubscribe(ctx, topic) + err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) return err diff --git a/cmd/waku/server/rpc/runner.go b/cmd/waku/server/rpc/runner.go index a0b24c9a..e09e031b 100644 --- a/cmd/waku/server/rpc/runner.go +++ b/cmd/waku/server/rpc/runner.go @@ -9,7 +9,7 @@ type Adder func(msg *protocol.Envelope) type runnerService struct { broadcaster relay.Broadcaster - sub relay.Subscription + sub *relay.Subscription adder Adder } @@ -21,7 +21,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService } func (r *runnerService) Start() { - r.sub = r.broadcaster.RegisterForAll(1024) + r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize)) for envelope := range r.sub.Ch { r.adder(envelope) } diff --git a/examples/basic2/main.go b/examples/basic2/main.go index a5407bd7..1cad0b8f 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -16,6 +16,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -116,13 +117,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string } func readLoop(ctx context.Context, wakuNode *node.WakuNode, contentTopic string) { - sub, err := wakuNode.Relay().Subscribe(ctx) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) if err != nil { log.Error("Could not subscribe", zap.Error(err)) return } - for envelope := range sub.Ch { + for envelope := range sub[0].Ch { if envelope.Message().ContentTopic != contentTopic { continue } diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index b8f4e5f6..fcf7e221 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -3,7 +3,6 @@ package main import ( "chat2/pb" "context" - "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -24,7 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-zerokit-rln/rln" - "golang.org/x/crypto/pbkdf2" "google.golang.org/protobuf/proto" ) @@ -84,13 +82,13 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. } else { for _, topic := range topics { - sub, err := node.Relay().SubscribeToTopic(ctx, topic) + sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { chat.ui.ErrorMessage(err) } else { chat.C = make(chan *protocol.Envelope) go func() { - for e := range sub.Ch { + for e := range sub[0].Ch { chat.C <- e } }() @@ -356,12 +354,6 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Mess return msg, nil } -func generateSymKey(password string) []byte { - // AesKeyLength represents the length (in bytes) of an private key - AESKeyLength := 256 / 8 - return pbkdf2.Key([]byte(password), nil, 65356, AESKeyLength, sha256.New) -} - func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { defer c.wg.Done() diff --git a/examples/filter2/main.go b/examples/filter2/main.go index ec00f95b..b2b985e9 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -172,13 +172,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) { pubsubTopic := pubSubTopic.String() - sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic)) if err != nil { log.Error("Could not subscribe: ", err) return } - for value := range sub.Ch { + for value := range sub[0].Ch { payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None}) if err != nil { fmt.Println(err) diff --git a/examples/rln/main.go b/examples/rln/main.go index 3c566d8d..6b0c9907 100644 --- a/examples/rln/main.go +++ b/examples/rln/main.go @@ -142,13 +142,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } func readLoop(ctx context.Context, wakuNode *node.WakuNode) { - sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic.String()) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic.String())) if err != nil { log.Error("Could not subscribe", zap.Error(err)) return } - for envelope := range sub.Ch { + for envelope := range sub[0].Ch { if envelope.Message().ContentTopic != contentTopic.String() { continue } diff --git a/library/relay.go b/library/relay.go index 83751443..e77c49d5 100644 --- a/library/relay.go +++ b/library/relay.go @@ -69,18 +69,18 @@ func relaySubscribe(topic string) error { return nil } - subscription, err := wakuState.node.Relay().SubscribeToTopic(context.Background(), topicToSubscribe) + subscription, err := wakuState.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(topicToSubscribe)) if err != nil { return err } - relaySubscriptions[topicToSubscribe] = subscription + relaySubscriptions[topicToSubscribe] = subscription[0] go func(subscription *relay.Subscription) { for envelope := range subscription.Ch { send("message", toSubscriptionMessage(envelope)) } - }(subscription) + }(subscription[0]) return nil } @@ -123,5 +123,5 @@ func RelayUnsubscribe(topic string) error { delete(relaySubscriptions, topicToUnsubscribe) - return wakuState.node.Relay().Unsubscribe(context.Background(), topicToUnsubscribe) + return wakuState.node.Relay().Unsubscribe(context.Background(), protocol.NewContentFilter(topicToUnsubscribe)) } diff --git a/tests/connection_test.go b/tests/connection_test.go index 28ca361f..1bc98450 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -10,7 +10,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/payload" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -39,10 +41,10 @@ func TestBasicSendingReceiving(t *testing.T) { require.NoError(t, write(ctx, wakuNode, "test")) - sub, err := wakuNode.Relay().Subscribe(ctx) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) - value := <-sub.Ch + value := <-sub[0].Ch payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None}) require.NoError(t, err) @@ -70,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro Timestamp: timestamp, } - _, err = wakuNode.Relay().Publish(ctx, msg) + _, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic) return err } diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index 398e22fe..8a252734 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -16,5 +16,5 @@ type Service interface { type ReceptorService interface { SetHost(h host.Host) Stop() - Start(context.Context, relay.Subscription) error + Start(context.Context, *relay.Subscription) error } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8d4540af..c19e6aad 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -679,7 +679,7 @@ func (w *WakuNode) mountDiscV5() error { return err } -func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error { +func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) error { err := w.store.Start(ctx, sub) if err != nil { w.log.Error("starting store", zap.Error(err)) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 71a3d101..333282e6 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -54,11 +55,11 @@ func TestWakuNode2(t *testing.T) { err = wakuNode.Start(ctx) require.NoError(t, err) - _, err = wakuNode.Relay().SubscribeToTopic(ctx, "waku/rs/1/1") + _, err = wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter("waku/rs/1/1")) require.NoError(t, err) time.Sleep(time.Second * 1) - err = wakuNode.Relay().Unsubscribe(ctx, "waku/rs/1/1") + err = wakuNode.Relay().Unsubscribe(ctx, protocol.NewContentFilter("waku/rs/1/1")) require.NoError(t, err) defer wakuNode.Stop() @@ -151,9 +152,9 @@ func Test500(t *testing.T) { time.Sleep(2 * time.Second) - sub1, err := wakuNode1.Relay().Subscribe(ctx) + sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) - sub2, err := wakuNode1.Relay().Subscribe(ctx) + sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) wg := sync.WaitGroup{} @@ -168,7 +169,7 @@ func Test500(t *testing.T) { select { case <-ticker.C: require.Fail(t, "Timeout Sub1") - case msg := <-sub1.Ch: + case msg := <-sub1[0].Ch: if msg == nil { return } @@ -189,7 +190,7 @@ func Test500(t *testing.T) { select { case <-ticker.C: require.Fail(t, "Timeout Sub2") - case msg := <-sub2.Ch: + case msg := <-sub2[0].Ch: if msg == nil { return } @@ -206,7 +207,7 @@ func Test500(t *testing.T) { msg := createTestMsg(0) msg.Payload = int2Bytes(i) msg.Timestamp = int64(i) - if _, err := wakuNode2.Relay().Publish(ctx, msg); err != nil { + if _, err := wakuNode2.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { require.Fail(t, "Could not publish all messages") } time.Sleep(5 * time.Millisecond) @@ -234,9 +235,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode1.Stop() - subs, err := wakuNode1.Relay().Subscribe(ctx) + subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) - subs.Unsubscribe() + defer subs[0].Unsubscribe() // NODE2: Filter Client/Store db, err := sqlite.NewDB(":memory:", false, utils.Logger()) @@ -286,7 +287,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { time.Sleep(500 * time.Millisecond) - if _, err := wakuNode1.Relay().Publish(ctx, msg); err != nil { + if _, err := wakuNode1.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { require.Fail(t, "Could not publish all messages") } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2f85d9c1..9e364e21 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -24,7 +24,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/transport/tcp" libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" "github.com/multiformats/go-multiaddr" - ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/protocol/filter" @@ -232,7 +231,7 @@ func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption { } // WithAdvertiseAddresses is a WakuNodeOption that allows overriding the address used in the waku node with custom value -func WithAdvertiseAddresses(advertiseAddrs ...ma.Multiaddr) WakuNodeOption { +func WithAdvertiseAddresses(advertiseAddrs ...multiaddr.Multiaddr) WakuNodeOption { return func(params *WakuNodeParameters) error { params.advertiseAddrs = advertiseAddrs return WithMultiaddress(advertiseAddrs...)(params) diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index c230f6f3..ef9e1bdf 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -2,6 +2,9 @@ package protocol import "golang.org/x/exp/maps" +type PubsubTopicStr = string +type ContentTopicStr = string + type ContentTopicSet map[string]struct{} func NewContentTopicSet(contentTopics ...string) ContentTopicSet { @@ -28,3 +31,40 @@ func (cf ContentFilter) ContentTopicsList() []string { func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter { return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)} } + +func (cf ContentFilter) Equals(cf1 ContentFilter) bool { + if cf.PubsubTopic != cf1.PubsubTopic || + len(cf.ContentTopics) != len(cf1.ContentTopics) { + return false + } + for topic := range cf.ContentTopics { + _, ok := cf1.ContentTopics[topic] + if !ok { + return false + } + } + return true +} + +// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics +func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { + pubSubTopicMap := make(map[string][]string) + + if contentFilter.PubsubTopic != "" { + pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() + } else { + //Parse the content-Topics to figure out shards. + for _, cTopicString := range contentFilter.ContentTopicsList() { + pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString) + if err != nil { + return nil, err + } + _, ok := pubSubTopicMap[pTopicStr] + if !ok { + pubSubTopicMap[pTopicStr] = []string{} + } + pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) + } + } + return pubSubTopicMap, nil +} diff --git a/waku/v2/protocol/content_topic.go b/waku/v2/protocol/content_topic.go index 38caed6d..9c737070 100644 --- a/waku/v2/protocol/content_topic.go +++ b/waku/v2/protocol/content_topic.go @@ -10,7 +10,7 @@ import ( // DefaultContentTopic is the default content topic used in Waku network if no content topic is specified. const DefaultContentTopic = "/waku/2/default-content/proto" -var ErrInvalidFormat = errors.New("invalid format") +var ErrInvalidFormat = errors.New("invalid content topic format") var ErrMissingGeneration = errors.New("missing part: generation") var ErrInvalidGeneration = errors.New("generation should be a number") diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index b9df51ed..e1c1bb6a 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -214,29 +214,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } -// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics -func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) { - pubSubTopicMap := make(map[string][]string) - - if contentFilter.PubsubTopic != "" { - pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() - } else { - //Parse the content-Topics to figure out shards. - for _, cTopicString := range contentFilter.ContentTopicsList() { - pTopicStr, err := protocol.GetPubSubTopicFromContentTopic(cTopicString) - if err != nil { - return nil, err - } - _, ok := pubSubTopicMap[pTopicStr] - if !ok { - pubSubTopicMap[pTopicStr] = []string{} - } - pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) - } - } - return pubSubTopicMap, nil -} - // Subscribe setups a subscription to receive messages that match a specific content filter // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. @@ -273,7 +250,8 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot } } - pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { return nil, err } @@ -454,7 +432,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr return nil, err } - pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 685a15ea..f1b9b8d4 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -63,10 +63,10 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay. err = relay.Start(context.Background()) s.Require().NoError(err) - sub, err := relay.SubscribeToTopic(context.Background(), topic) + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) s.Require().NoError(err) - return relay, sub, host, broadcaster + return relay, sub[0], host, broadcaster } func (s *FilterTestSuite) makeWakuFilterLightNode(start bool, withBroadcaster bool) *WakuFilterLightNode { @@ -97,7 +97,7 @@ func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) node2Filter.SetHost(host) - sub := broadcaster.Register(topic) + sub := broadcaster.Register(protocol.NewContentFilter(topic)) err := node2Filter.Start(s.ctx, sub) s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 060ea3e3..22166570 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -31,7 +31,7 @@ const peerHasNoSubscription = "peer has no subscriptions" type ( WakuFilterFullNode struct { h host.Host - msgSub relay.Subscription + msgSub *relay.Subscription metrics Metrics log *zap.Logger *protocol.CommonService @@ -66,13 +66,13 @@ func (wf *WakuFilterFullNode) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilterFullNode) Start(ctx context.Context, sub relay.Subscription) error { +func (wf *WakuFilterFullNode) Start(ctx context.Context, sub *relay.Subscription) error { return wf.CommonService.Start(ctx, func() error { return wf.start(sub) }) } -func (wf *WakuFilterFullNode) start(sub relay.Subscription) error { +func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error { wf.h.SetStreamHandlerMatch(FilterSubscribeID_v20beta1, protocol.PrefixTextMatch(string(FilterSubscribeID_v20beta1)), wf.onRequest(wf.Context())) wf.msgSub = sub diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index f2079d40..7b085d95 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -51,7 +51,7 @@ type ( h host.Host pm *peermanager.PeerManager isFullNode bool - msgSub relay.Subscription + msgSub *relay.Subscription metrics Metrics log *zap.Logger @@ -89,13 +89,13 @@ func (wf *WakuFilter) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilter) Start(ctx context.Context, sub relay.Subscription) error { +func (wf *WakuFilter) Start(ctx context.Context, sub *relay.Subscription) error { return wf.CommonService.Start(ctx, func() error { return wf.start(sub) }) } -func (wf *WakuFilter) start(sub relay.Subscription) error { +func (wf *WakuFilter) start(sub *relay.Subscription) error { wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(wf.Context())) wf.msgSub = sub wf.WaitGroup().Add(1) diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index f40783a2..a4aa32bb 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -12,11 +12,15 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) +var testTopic = "/waku/2/go/filter/test" +var testContentTopic = "TopicA" + func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -29,10 +33,10 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (* err = relay.Start(context.Background()) require.NoError(t, err) - sub, err := relay.SubscribeToTopic(context.Background(), topic) + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) require.NoError(t, err) - return relay, sub, host + return relay, sub[0], host } func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { @@ -46,7 +50,8 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { require.NoError(t, b.Start(context.Background())) filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) filter.SetHost(host) - err = filter.Start(context.Background(), relay.NoopSubscription()) + sub := relay.NewSubscription(protocol.NewContentFilter(testTopic, testContentTopic)) + err = filter.Start(context.Background(), sub) require.NoError(t, err) return filter, host @@ -66,9 +71,6 @@ func TestWakuFilter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds defer cancel() - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" - node1, host1 := makeWakuFilter(t) defer node1.Stop() @@ -80,7 +82,7 @@ func TestWakuFilter(t *testing.T) { node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) + sub := broadcaster.Register(protocol.NewContentFilter(testTopic)) err := node2Filter.Start(ctx, sub) require.NoError(t, err) @@ -156,9 +158,6 @@ func TestWakuFilterPeerFailure(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds defer cancel() - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" - node1, host1 := makeWakuFilter(t) broadcaster := relay.NewBroadcaster(10) @@ -171,7 +170,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { require.NoError(t, broadcaster2.Start(context.Background())) node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger(), WithTimeout(3*time.Second)) node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) + sub := broadcaster.Register(protocol.NewContentFilter(testTopic)) err := node2Filter.Start(ctx, sub) require.NoError(t, err) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index e0bda40f..ab4455a8 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -33,10 +33,10 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su err = relay.Start(context.Background()) require.NoError(t, err) - sub, err := relay.SubscribeToTopic(context.Background(), pusubTopic) + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(pusubTopic)) require.NoError(t, err) - return relay, sub, host + return relay, sub[0], host } // Node1: Relay diff --git a/waku/v2/protocol/noise/pairing_relay_messenger.go b/waku/v2/protocol/noise/pairing_relay_messenger.go index edb14192..489fc446 100644 --- a/waku/v2/protocol/noise/pairing_relay_messenger.go +++ b/waku/v2/protocol/noise/pairing_relay_messenger.go @@ -4,6 +4,7 @@ import ( "context" n "github.com/waku-org/go-noise" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -16,7 +17,7 @@ type NoiseMessenger interface { } type contentTopicSubscription struct { - broadcastSub relay.Subscription + broadcastSub *relay.Subscription msgChan chan *pb.WakuMessage } @@ -39,16 +40,19 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic topic = relay.DefaultWakuTopic } - subs, err := r.SubscribeToTopic(ctx, topic) + subs, err := r.Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { return nil, err } - + //Note: Safely assuming 0th index as subscription is based on pubSubTopic. + // Once this API is changed to support subscription based on contentTopics, this logic should also be changed. + sub := subs[0] ctx, cancel := context.WithCancel(ctx) wr := &NoiseWakuRelay{ - relay: r, - relaySub: subs, + relay: r, + + relaySub: sub, cancel: cancel, timesource: timesource, broadcaster: relay.NewBroadcaster(1024), @@ -65,10 +69,10 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic for { select { case <-ctx.Done(): - subs.Unsubscribe() + sub.Unsubscribe() wr.broadcaster.Stop() return - case envelope := <-subs.Ch: + case envelope := <-sub.Ch: if envelope != nil { wr.broadcaster.Submit(envelope) } @@ -84,7 +88,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c msgChan: make(chan *pb.WakuMessage, 1024), } - broadcastSub := r.broadcaster.RegisterForAll(1024) + broadcastSub := r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize)) sub.broadcastSub = broadcastSub subscriptionCh := r.subscriptionChPerContentTopic[contentTopic] diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go index aea85e26..dccc8750 100644 --- a/waku/v2/protocol/relay/broadcast.go +++ b/waku/v2/protocol/relay/broadcast.go @@ -9,29 +9,58 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" ) -type chStore struct { +type BroadcasterParameters struct { + dontConsume bool //Indicates whether to consume messages from subscription or drop + chLen int +} + +type BroadcasterOption func(*BroadcasterParameters) + +// WithoutConsumer option let's a user subscribe to a broadcaster without consuming messages received. +// This is useful for a relayNode where only a subscribe is required in order to relay messages in gossipsub network. +func DontConsume() BroadcasterOption { + return func(params *BroadcasterParameters) { + params.dontConsume = true + } +} + +// WithBufferSize option let's a user set channel buffer to be set. +func WithBufferSize(size int) BroadcasterOption { + return func(params *BroadcasterParameters) { + params.chLen = size + } +} + +// DefaultBroadcasterOptions specifies default options for broadcaster +func DefaultBroadcasterOptions() []BroadcasterOption { + return []BroadcasterOption{ + WithBufferSize(0), + } +} + +type Subscriptions struct { mu sync.RWMutex - topicToChans map[string]map[int]chan *protocol.Envelope + topicsToSubs map[string]map[int]*Subscription //map of pubSubTopic to subscriptions id int } -func newChStore() chStore { - return chStore{ - topicToChans: make(map[string]map[int]chan *protocol.Envelope), +func newSubStore() Subscriptions { + return Subscriptions{ + topicsToSubs: make(map[string]map[int]*Subscription), } } -func (s *chStore) getNewCh(topic string, chLen int) Subscription { +func (s *Subscriptions) createNewSubscription(contentFilter protocol.ContentFilter, dontConsume bool, chLen int) *Subscription { ch := make(chan *protocol.Envelope, chLen) s.mu.Lock() defer s.mu.Unlock() s.id++ - // - if s.topicToChans[topic] == nil { - s.topicToChans[topic] = make(map[int]chan *protocol.Envelope) + pubsubTopic := contentFilter.PubsubTopic + if s.topicsToSubs[pubsubTopic] == nil { + s.topicsToSubs[pubsubTopic] = make(map[int]*Subscription) } id := s.id - s.topicToChans[topic][id] = ch - return Subscription{ + sub := Subscription{ + ID: id, // read only channel,will not block forever, returns once closed. Ch: ch, // Unsubscribe function is safe, can be called multiple times @@ -39,21 +68,25 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription { Unsubscribe: func() { s.mu.Lock() defer s.mu.Unlock() - if s.topicToChans[topic] == nil { + if s.topicsToSubs[pubsubTopic] == nil { return } - if ch := s.topicToChans[topic][id]; ch != nil { - close(ch) - delete(s.topicToChans[topic], id) + if sub := s.topicsToSubs[pubsubTopic][id]; sub != nil { + close(sub.Ch) + delete(s.topicsToSubs[pubsubTopic], id) } }, + contentFilter: contentFilter, + noConsume: dontConsume, } + s.topicsToSubs[pubsubTopic][id] = &sub + return &sub } -func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) { +func (s *Subscriptions) broadcast(ctx context.Context, m *protocol.Envelope) { s.mu.RLock() defer s.mu.RUnlock() - for _, ch := range s.topicToChans[m.PubsubTopic()] { + for _, sub := range s.topicsToSubs[m.PubsubTopic()] { select { // using ctx.Done for returning on cancellation is needed // reason: @@ -62,36 +95,40 @@ func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) { // this will also block the chStore close function as it uses same mutex case <-ctx.Done(): return - case ch <- m: + default: + sub.Submit(ctx, m) } } - // send to all registered subscribers - for _, ch := range s.topicToChans[""] { + + // send to all wildcard subscribers + for _, sub := range s.topicsToSubs[""] { select { case <-ctx.Done(): return - case ch <- m: + default: + sub.Submit(ctx, m) } } } -func (s *chStore) close() { +func (s *Subscriptions) close() { s.mu.Lock() defer s.mu.Unlock() - for _, chans := range s.topicToChans { - for _, ch := range chans { - close(ch) + for _, subs := range s.topicsToSubs { + for _, sub := range subs { + close(sub.Ch) } } - s.topicToChans = nil + s.topicsToSubs = nil } // Broadcaster is used to create a fanout for an envelope that will be received by any subscriber interested in the topic of the message type Broadcaster interface { Start(ctx context.Context) error Stop() - Register(topic string, chLen ...int) Subscription - RegisterForAll(chLen ...int) Subscription + Register(contentFilter protocol.ContentFilter, opts ...BroadcasterOption) *Subscription + RegisterForAll(opts ...BroadcasterOption) *Subscription + UnRegister(pubsubTopic string) Submit(*protocol.Envelope) } @@ -106,8 +143,8 @@ type broadcaster struct { cancel context.CancelFunc input chan *protocol.Envelope // - chStore chStore - running atomic.Bool + subscriptions Subscriptions + running atomic.Bool } // NewBroadcaster creates a new instance of a broadcaster @@ -124,7 +161,7 @@ func (b *broadcaster) Start(ctx context.Context) error { } ctx, cancel := context.WithCancel(ctx) b.cancel = cancel - b.chStore = newChStore() + b.subscriptions = newSubStore() b.input = make(chan *protocol.Envelope, b.bufLen) go b.run(ctx) return nil @@ -137,7 +174,7 @@ func (b *broadcaster) run(ctx context.Context) { return case msg, ok := <-b.input: if ok { - b.chStore.broadcast(ctx, msg) + b.subscriptions.broadcast(ctx, msg) } } } @@ -149,28 +186,41 @@ func (b *broadcaster) Stop() { return } // cancel must be before chStore.close(), so that broadcast releases lock before chStore.close() acquires it. - b.cancel() // exit the run loop, - b.chStore.close() // close all channels that we send to - close(b.input) // close input channel + b.cancel() // exit the run loop, + b.subscriptions.close() // close all channels that we send to + close(b.input) // close input channel } -// Register returns a subscription for an specific topic -func (b *broadcaster) Register(topic string, chLen ...int) Subscription { - return b.chStore.getNewCh(topic, getChLen(chLen)) +// Register returns a subscription for an specific pubsub topic and/or list of contentTopics +func (b *broadcaster) Register(contentFilter protocol.ContentFilter, opts ...BroadcasterOption) *Subscription { + params := b.ProcessOpts(opts...) + return b.subscriptions.createNewSubscription(contentFilter, params.dontConsume, params.chLen) +} + +func (b *broadcaster) ProcessOpts(opts ...BroadcasterOption) *BroadcasterParameters { + params := new(BroadcasterParameters) + optList := DefaultBroadcasterOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + return params +} + +// UnRegister removes all subscriptions for an specific pubsub topic +func (b *broadcaster) UnRegister(pubsubTopic string) { + subs := b.subscriptions.topicsToSubs[pubsubTopic] + if len(subs) > 0 { + for _, sub := range subs { + sub.Unsubscribe() + } + } } // RegisterForAll returns a subscription for all topics -func (b *broadcaster) RegisterForAll(chLen ...int) Subscription { - - return b.chStore.getNewCh("", getChLen(chLen)) -} - -func getChLen(chLen []int) int { - l := 0 - if len(chLen) > 0 { - l = chLen[0] - } - return l +func (b *broadcaster) RegisterForAll(opts ...BroadcasterOption) *Subscription { + params := b.ProcessOpts(opts...) + return b.subscriptions.createNewSubscription(protocol.NewContentFilter(""), params.dontConsume, params.chLen) } // Submit is used to broadcast messages to subscribers. It only accepts value when running. diff --git a/waku/v2/protocol/relay/broadcast_test.go b/waku/v2/protocol/relay/broadcast_test.go index 59bd26b9..55422d22 100644 --- a/waku/v2/protocol/relay/broadcast_test.go +++ b/waku/v2/protocol/relay/broadcast_test.go @@ -46,7 +46,7 @@ func TestBroadcastSpecificTopic(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) - sub := b.Register("abc") + sub := b.Register(protocol.NewContentFilter("abc")) go func() { defer wg.Done() @@ -66,7 +66,7 @@ func TestBroadcastSpecificTopic(t *testing.T) { func TestBroadcastCleanup(t *testing.T) { b := NewBroadcaster(100) require.NoError(t, b.Start(context.Background())) - sub := b.Register("test") + sub := b.Register(protocol.NewContentFilter("test")) b.Stop() <-sub.Ch sub.Unsubscribe() @@ -78,7 +78,7 @@ func TestBroadcastUnregisterSub(t *testing.T) { require.NoError(t, b.Start(context.Background())) subForAll := b.RegisterForAll() // unregister before submit - specificSub := b.Register("abc") + specificSub := b.Register(protocol.NewContentFilter("abc")) specificSub.Unsubscribe() // env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc") diff --git a/waku/v2/protocol/relay/config.go b/waku/v2/protocol/relay/config.go new file mode 100644 index 00000000..61560d2f --- /dev/null +++ b/waku/v2/protocol/relay/config.go @@ -0,0 +1,130 @@ +package relay + +import ( + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/hash" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" +) + +var DefaultRelaySubscriptionBufferSize int = 1024 + +type RelaySubscribeParameters struct { + dontConsume bool +} + +type RelaySubscribeOption func(*RelaySubscribeParameters) error + +// WithoutConsumer option let's a user subscribe to relay without consuming messages received. +// This is useful for a relayNode where only a subscribe is required in order to relay messages in gossipsub network. +func WithoutConsumer() RelaySubscribeOption { + return func(params *RelaySubscribeParameters) error { + params.dontConsume = true + return nil + } +} + +func msgIDFn(pmsg *pubsub_pb.Message) string { + return string(hash.SHA256(pmsg.Data)) +} + +func (w *WakuRelay) setDefaultPeerScoreParams() { + w.peerScoreParams = &pubsub.PeerScoreParams{ + Topics: make(map[string]*pubsub.TopicScoreParams), + DecayInterval: 12 * time.Second, // how often peer scoring is updated + DecayToZero: 0.01, // below this we consider the parameter to be zero + RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects + // p5: application specific, unset + AppSpecificScore: func(p peer.ID) float64 { + return 0 + }, + AppSpecificWeight: 0.0, + // p6: penalizes peers sharing more than threshold ips + IPColocationFactorWeight: -50, + IPColocationFactorThreshold: 5.0, + // p7: penalizes bad behaviour (weight and decay) + BehaviourPenaltyWeight: -10, + BehaviourPenaltyDecay: 0.986, + } + w.peerScoreThresholds = &pubsub.PeerScoreThresholds{ + GossipThreshold: -100, // no gossip is sent to peers below this score + PublishThreshold: -1000, // no self-published msgs are sent to peers below this score + GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score + OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. + } +} + +func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option { + + cfg := pubsub.DefaultGossipSubParams() + cfg.PruneBackoff = time.Minute + cfg.UnsubscribeBackoff = 5 * time.Second + cfg.GossipFactor = 0.25 + cfg.D = waku_proto.GossipSubOptimalFullMeshSize + cfg.Dlo = 4 + cfg.Dhi = 12 + cfg.Dout = 3 + cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize + cfg.HeartbeatInterval = time.Second + cfg.HistoryLength = 6 + cfg.HistoryGossip = 3 + cfg.FanoutTTL = time.Minute + + w.setDefaultPeerScoreParams() + + w.setDefaultTopicParams() + return []pubsub.Option{ + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), + pubsub.WithNoAuthor(), + pubsub.WithMessageIdFn(msgIDFn), + pubsub.WithGossipSubProtocols( + []protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID}, + func(feat pubsub.GossipSubFeature, proto protocol.ID) bool { + switch feat { + case pubsub.GossipSubFeatureMesh: + return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 || proto == WakuRelayID_v200 + case pubsub.GossipSubFeaturePX: + return proto == pubsub.GossipSubID_v11 || proto == WakuRelayID_v200 + default: + return false + } + }, + ), + pubsub.WithGossipSubParams(cfg), + pubsub.WithFloodPublish(true), + pubsub.WithSeenMessagesTTL(2 * time.Minute), + pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds), + pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second), + } +} + +func (w *WakuRelay) setDefaultTopicParams() { + w.topicParams = &pubsub.TopicScoreParams{ + TopicWeight: 1, + // p1: favours peers already in the mesh + TimeInMeshWeight: 0.01, + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 10.0, + // p2: rewards fast peers + FirstMessageDeliveriesWeight: 1.0, + FirstMessageDeliveriesDecay: 0.5, + FirstMessageDeliveriesCap: 10.0, + // p3: penalizes lazy peers. safe low value + MeshMessageDeliveriesWeight: 0, + MeshMessageDeliveriesDecay: 0, + MeshMessageDeliveriesCap: 0, + MeshMessageDeliveriesThreshold: 0, + MeshMessageDeliveriesWindow: 0, + MeshMessageDeliveriesActivation: 0, + // p3b: tracks history of prunes + MeshFailurePenaltyWeight: 0, + MeshFailurePenaltyDecay: 0, + // p4: penalizes invalid messages. highly penalize peers sending wrong messages + InvalidMessageDeliveriesWeight: -100.0, + InvalidMessageDeliveriesDecay: 0.5, + } +} diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index 3f7cb34d..a7bb6a36 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -1,32 +1,57 @@ package relay -import "github.com/waku-org/go-waku/waku/v2/protocol" +import ( + "context" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "golang.org/x/exp/slices" +) // Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic. type Subscription struct { - Unsubscribe func() - Ch <-chan *protocol.Envelope + ID int + Unsubscribe func() //for internal use only. For relay Subscription use relay protocol's unsubscribe + Ch chan *protocol.Envelope + contentFilter protocol.ContentFilter + subType SubscriptionType + noConsume bool } -// NoopSubscription creates a noop subscription that will not receive any envelope -func NoopSubscription() Subscription { +type SubscriptionType int + +const ( + SpecificContentTopics SubscriptionType = iota + AllContentTopics +) + +// Submit allows a message to be submitted for a subscription +func (s *Subscription) Submit(ctx context.Context, msg *protocol.Envelope) { + //Filter and notify + // - if contentFilter doesn't have a contentTopic + // - if contentFilter has contentTopics and it matches with message + if !s.noConsume && (len(s.contentFilter.ContentTopicsList()) == 0 || + (len(s.contentFilter.ContentTopicsList()) > 0 && slices.Contains[string](s.contentFilter.ContentTopicsList(), msg.Message().ContentTopic))) { + select { + case <-ctx.Done(): + return + case s.Ch <- msg: + } + } +} + +// NewSubscription creates a subscription that will only receive messages based on the contentFilter +func NewSubscription(contentFilter protocol.ContentFilter) *Subscription { ch := make(chan *protocol.Envelope) - close(ch) - return Subscription{ - Unsubscribe: func() {}, - Ch: ch, - } -} - -// ArraySubscription creates a subscription for a list of envelopes -func ArraySubscription(msgs []*protocol.Envelope) Subscription { - ch := make(chan *protocol.Envelope, len(msgs)) - for _, msg := range msgs { - ch <- msg - } - close(ch) - return Subscription{ - Unsubscribe: func() {}, - Ch: ch, + var subType SubscriptionType + if len(contentFilter.ContentTopicsList()) == 0 { + subType = AllContentTopics + } + return &Subscription{ + Unsubscribe: func() { + close(ch) + }, + Ch: ch, + contentFilter: contentFilter, + subType: subType, } } diff --git a/waku/v2/protocol/relay/topic_events.go b/waku/v2/protocol/relay/topic_events.go new file mode 100644 index 00000000..60261a3b --- /dev/null +++ b/waku/v2/protocol/relay/topic_events.go @@ -0,0 +1,102 @@ +package relay + +import ( + "context" + "errors" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + "go.uber.org/zap" +) + +// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created +type EvtRelaySubscribed struct { + Topic string + TopicInst *pubsub.Topic +} + +// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed +type EvtRelayUnsubscribed struct { + Topic string +} + +type PeerTopicState int + +const ( + PEER_JOINED = iota + PEER_LEFT +) + +type EvtPeerTopic struct { + PubsubTopic string + PeerID peer.ID + State PeerTopicState +} + +// Events returns the event bus on which WakuRelay events will be emitted +func (w *WakuRelay) Events() event.Bus { + return w.events +} + +func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) { + handler, err := topic.EventHandler() + if err != nil { + return nil, err + } + w.WaitGroup().Add(1) + go w.topicEventPoll(topic.String(), handler) + return handler, nil +} + +func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) { + defer w.WaitGroup().Done() + for { + evt, err := handler.NextPeerEvent(w.Context()) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + break + } + w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err)) + continue + } + if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done. + break + } + if evt.Type == pubsub.PeerJoin { + w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED}) + if err != nil { + w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err)) + } + } else if evt.Type == pubsub.PeerLeave { + w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT}) + if err != nil { + w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err)) + } + } else { + w.log.Error("unknown event type received", zap.String("topic", topic), + zap.Int("eventType", int(evt.Type))) + } + } +} + +func (w *WakuRelay) CreateEventEmitters() error { + var err error + w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed)) + if err != nil { + return err + } + w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed)) + if err != nil { + return err + } + + w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic)) + if err != nil { + return err + } + return nil +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 6d19afc7..ae69fae8 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -3,9 +3,7 @@ package relay import ( "context" "errors" - "fmt" "sync" - "time" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" @@ -17,9 +15,7 @@ import ( proto "google.golang.org/protobuf/proto" pubsub "github.com/libp2p/go-libp2p-pubsub" - pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/hash" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -65,40 +61,13 @@ type WakuRelay struct { EvtRelayUnsubscribed event.Emitter EvtPeerTopic event.Emitter } - + contentSubs map[string]map[int]*Subscription *waku_proto.CommonService } -// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created -type EvtRelaySubscribed struct { - Topic string - TopicInst *pubsub.Topic -} - -// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed -type EvtRelayUnsubscribed struct { - Topic string -} - -type PeerTopicState int - -const ( - PEER_JOINED = iota - PEER_LEFT -) - -type EvtPeerTopic struct { - PubsubTopic string - PeerID peer.ID - State PeerTopicState -} - -func msgIDFn(pmsg *pubsub_pb.Message) string { - return string(hash.SHA256(pmsg.Data)) -} - // NewWakuRelay returns a new instance of a WakuRelay struct -func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { +func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, + reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { w := new(WakuRelay) w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) @@ -112,95 +81,9 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.log) - cfg := pubsub.DefaultGossipSubParams() - cfg.PruneBackoff = time.Minute - cfg.UnsubscribeBackoff = 5 * time.Second - cfg.GossipFactor = 0.25 - cfg.D = waku_proto.GossipSubOptimalFullMeshSize - cfg.Dlo = 4 - cfg.Dhi = 12 - cfg.Dout = 3 - cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize - cfg.HeartbeatInterval = time.Second - cfg.HistoryLength = 6 - cfg.HistoryGossip = 3 - cfg.FanoutTTL = time.Minute - - w.peerScoreParams = &pubsub.PeerScoreParams{ - Topics: make(map[string]*pubsub.TopicScoreParams), - DecayInterval: 12 * time.Second, // how often peer scoring is updated - DecayToZero: 0.01, // below this we consider the parameter to be zero - RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects - // p5: application specific, unset - AppSpecificScore: func(p peer.ID) float64 { - return 0 - }, - AppSpecificWeight: 0.0, - // p6: penalizes peers sharing more than threshold ips - IPColocationFactorWeight: -50, - IPColocationFactorThreshold: 5.0, - // p7: penalizes bad behaviour (weight and decay) - BehaviourPenaltyWeight: -10, - BehaviourPenaltyDecay: 0.986, - } - - w.peerScoreThresholds = &pubsub.PeerScoreThresholds{ - GossipThreshold: -100, // no gossip is sent to peers below this score - PublishThreshold: -1000, // no self-published msgs are sent to peers below this score - GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score - OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. - } - - w.topicParams = &pubsub.TopicScoreParams{ - TopicWeight: 1, - // p1: favours peers already in the mesh - TimeInMeshWeight: 0.01, - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 10.0, - // p2: rewards fast peers - FirstMessageDeliveriesWeight: 1.0, - FirstMessageDeliveriesDecay: 0.5, - FirstMessageDeliveriesCap: 10.0, - // p3: penalizes lazy peers. safe low value - MeshMessageDeliveriesWeight: 0, - MeshMessageDeliveriesDecay: 0, - MeshMessageDeliveriesCap: 0, - MeshMessageDeliveriesThreshold: 0, - MeshMessageDeliveriesWindow: 0, - MeshMessageDeliveriesActivation: 0, - // p3b: tracks history of prunes - MeshFailurePenaltyWeight: 0, - MeshFailurePenaltyDecay: 0, - // p4: penalizes invalid messages. highly penalize peers sending wrong messages - InvalidMessageDeliveriesWeight: -100.0, - InvalidMessageDeliveriesDecay: 0.5, - } - // default options required by WakuRelay - w.opts = append([]pubsub.Option{ - pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), - pubsub.WithNoAuthor(), - pubsub.WithMessageIdFn(msgIDFn), - pubsub.WithGossipSubProtocols( - []protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID}, - func(feat pubsub.GossipSubFeature, proto protocol.ID) bool { - switch feat { - case pubsub.GossipSubFeatureMesh: - return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 || proto == WakuRelayID_v200 - case pubsub.GossipSubFeaturePX: - return proto == pubsub.GossipSubID_v11 || proto == WakuRelayID_v200 - default: - return false - } - }, - ), - pubsub.WithGossipSubParams(cfg), - pubsub.WithFloodPublish(true), - pubsub.WithSeenMessagesTTL(2 * time.Minute), - pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds), - pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second), - }, opts...) - + w.opts = append(w.defaultPubsubOptions(), opts...) + w.contentSubs = make(map[string]map[int]*Subscription) return w } @@ -231,22 +114,16 @@ func (w *WakuRelay) Start(ctx context.Context) error { } func (w *WakuRelay) start() error { + if w.bcaster == nil { + return errors.New("broadcaster not specified for relay") + } ps, err := pubsub.NewGossipSub(w.Context(), w.host, w.opts...) if err != nil { return err } w.pubsub = ps - w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed)) - if err != nil { - return err - } - w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed)) - if err != nil { - return err - } - - w.emitters.EvtPeerTopic, err = w.events.Emitter(new(EvtPeerTopic)) + err = w.CreateEventEmitters() if err != nil { return err } @@ -312,7 +189,7 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { return pubSubTopic, nil } -func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) { +func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] if !ok { pubSubTopic, err := w.upsertTopic(topic) @@ -320,11 +197,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } - sub, err = pubSubTopic.Subscribe() + sub, err = pubSubTopic.Subscribe(pubsub.WithBufferSize(1024)) if err != nil { return nil, err } + w.WaitGroup().Add(1) + go w.pubsubTopicMsgHandler(topic, sub) + evtHandler, err := w.addPeerTopicEventListener(pubSubTopic) if err != nil { return nil, err @@ -337,10 +217,6 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } - if w.bcaster != nil { - w.WaitGroup().Add(1) - go w.subscribeToTopic(topic, sub) - } w.log.Info("subscribing to topic", zap.String("topic", sub.Topic())) } @@ -385,9 +261,29 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return hash, nil } -// Publish is used to broadcast a WakuMessage to the default waku pubsub topic +// Publish is used to broadcast a WakuMessage, the pubsubTopic is derived from contentTopic specified in the message via autosharding. +// To publish to a specific pubsubTopic, please use PublishToTopic func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { - return w.PublishToTopic(ctx, message, DefaultWakuTopic) + pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) + if err != nil { + return nil, err + } + return w.PublishToTopic(ctx, message, pubSubTopic) +} + +func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { + pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + return nil, err + } + contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic) + cSubs := w.contentSubs[pubSubTopic] + for _, sub := range cSubs { + if sub.contentFilter.Equals(contentFilter) { + return sub, nil + } + } + return nil, errors.New("no subscription found for content topic") } // Stop unmounts the relay protocol and stops all subscriptions @@ -409,112 +305,170 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool { return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish } -// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic -func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) { - _, err := w.subscribe(topic) +// subscribe returns list of Subscription to receive messages based on content filter +func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { + + var subscriptions []*Subscription + pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err } + params := new(RelaySubscribeParameters) - // Create client subscription - subscription := NoopSubscription() - if w.bcaster != nil { - subscription = w.bcaster.Register(topic, 1024) + var optList []RelaySubscribeOption + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } } - go func() { - <-ctx.Done() - subscription.Unsubscribe() - }() - return &subscription, nil + + for pubSubTopic, cTopics := range pubSubTopicMap { + w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics)) + var cFilter waku_proto.ContentFilter + cFilter.PubsubTopic = pubSubTopic + cFilter.ContentTopics = waku_proto.NewContentTopicSet(cTopics...) + + //Check if gossipsub subscription already exists for pubSubTopic + if !w.IsSubscribed(pubSubTopic) { + _, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic) + if err != nil { + //TODO: Handle partial errors. + return nil, err + } + } + + subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize)) + + // Create Content subscription + w.topicsMutex.RLock() + if _, ok := w.contentSubs[pubSubTopic]; !ok { + w.contentSubs[pubSubTopic] = map[int]*Subscription{} + } + w.contentSubs[pubSubTopic][subscription.ID] = subscription + + w.topicsMutex.RUnlock() + subscriptions = append(subscriptions, subscription) + go func() { + <-ctx.Done() + subscription.Unsubscribe() + }() + } + + return subscriptions, nil } -// Subscribe returns a Subscription to receive messages from the default waku pubsub topic -func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) { - return w.SubscribeToTopic(ctx, DefaultWakuTopic) +// Subscribe returns a Subscription to receive messages as per contentFilter +// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding) +func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { + return w.subscribe(ctx, contentFilter, opts...) } // Unsubscribe closes a subscription to a pubsub topic -func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { +func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error { + + pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return err + } + w.topicsMutex.Lock() defer w.topicsMutex.Unlock() - sub, ok := w.relaySubs[topic] - if !ok { - return fmt.Errorf("not subscribed to topic") + for pubSubTopic, cTopics := range pubSubTopicMap { + cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...) + pubsubUnsubscribe := false + sub, ok := w.relaySubs[pubSubTopic] + if !ok { + return errors.New("not subscribed to topic") + } + cSubs := w.contentSubs[pubSubTopic] + if cSubs != nil { + //Remove relevant subscription + for subID, sub := range cSubs { + if sub.contentFilter.Equals(cfTemp) { + sub.Unsubscribe() + delete(cSubs, subID) + } + } + if len(cSubs) == 0 { + pubsubUnsubscribe = true + } + } else { + //Should not land here ideally + w.log.Error("pubsub subscriptions exists, but contentSubscription doesn't for contentFilter", + zap.String("pubsubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics)) + + return errors.New("unexpected error in unsubscribe") + } + + if pubsubUnsubscribe { + err = w.unsubscribeFromPubsubTopic(sub) + if err != nil { + return err + } + } } - w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic())) - - w.relaySubs[topic].Cancel() - delete(w.relaySubs, topic) - - evtHandler, ok := w.topicEvtHanders[topic] - if ok { - evtHandler.Cancel() - delete(w.topicEvtHanders, topic) - } - - err := w.wakuRelayTopics[topic].Close() - if err != nil { - return err - } - delete(w.wakuRelayTopics, topic) - - w.RemoveTopicValidator(topic) - - err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic}) - if err != nil { - return err - } - return nil } -func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { - msgChannel := make(chan *pubsub.Message, 1024) - go func() { - defer close(msgChannel) - for { - msg, err := sub.Next(ctx) - if err != nil { - if !errors.Is(err, context.Canceled) { - w.log.Error("getting message from subscription", zap.Error(err)) - } - sub.Cancel() - return - } - msgChannel <- msg - } - }() - return msgChannel +// unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub. +// Note: caller has to acquire topicsMutex in order to avoid race conditions +func (w *WakuRelay) unsubscribeFromPubsubTopic(sub *pubsub.Subscription) error { + + pubSubTopic := sub.Topic() + w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic)) + + sub.Cancel() + delete(w.relaySubs, pubSubTopic) + + w.bcaster.UnRegister(pubSubTopic) + + delete(w.contentSubs, pubSubTopic) + + evtHandler, ok := w.topicEvtHanders[pubSubTopic] + if ok { + evtHandler.Cancel() + delete(w.topicEvtHanders, pubSubTopic) + } + + err := w.wakuRelayTopics[pubSubTopic].Close() + if err != nil { + return err + } + delete(w.wakuRelayTopics, pubSubTopic) + + w.RemoveTopicValidator(pubSubTopic) + + err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic}) + if err != nil { + return err + } + return nil } -func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) { +func (w *WakuRelay) pubsubTopicMsgHandler(pubsubTopic string, sub *pubsub.Subscription) { defer w.WaitGroup().Done() - subChannel := w.nextMessage(w.Context(), sub) for { - select { - case <-w.Context().Done(): + msg, err := sub.Next(w.Context()) + if err != nil { + if !errors.Is(err, context.Canceled) { + w.log.Error("getting message from subscription", zap.Error(err)) + } + sub.Cancel() return - // TODO: if there are no more relay subscriptions, close the pubsub subscription - case msg, ok := <-subChannel: - if !ok { - return - } - wakuMessage := &pb.WakuMessage{} - if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - w.log.Error("decoding message", zap.Error(err)) - return - } - - envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic) - - w.metrics.RecordMessage(envelope) - - if w.bcaster != nil { - w.bcaster.Submit(envelope) - } } + wakuMessage := &pb.WakuMessage{} + if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { + w.log.Error("decoding message", zap.Error(err)) + return + } + envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic) + w.metrics.RecordMessage(envelope) + + w.bcaster.Submit(envelope) } } @@ -523,51 +477,3 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio func (w *WakuRelay) Params() pubsub.GossipSubParams { return w.params } - -// Events returns the event bus on which WakuRelay events will be emitted -func (w *WakuRelay) Events() event.Bus { - return w.events -} - -func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.TopicEventHandler, error) { - handler, err := topic.EventHandler() - if err != nil { - return nil, err - } - w.WaitGroup().Add(1) - go w.topicEventPoll(topic.String(), handler) - return handler, nil -} - -func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) { - defer w.WaitGroup().Done() - for { - evt, err := handler.NextPeerEvent(w.Context()) - if err != nil { - if err == context.Canceled { - break - } - w.log.Error("failed to get next peer event", zap.String("topic", topic), zap.Error(err)) - continue - } - if evt.Peer.Validate() != nil { //Empty peerEvent is returned when context passed in done. - break - } - if evt.Type == pubsub.PeerJoin { - w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) - err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED}) - if err != nil { - w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err)) - } - } else if evt.Type == pubsub.PeerLeave { - w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) - err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT}) - if err != nil { - w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err)) - } - } else { - w.log.Error("unknown event type received", zap.String("topic", topic), - zap.Int("eventType", int(evt.Type))) - } - } -} diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 38b5d42f..e49db217 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -26,14 +28,18 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcaster := NewBroadcaster(10) + relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) + + err = bcaster.Start(context.Background()) + require.NoError(t, err) defer relay.Stop() - sub, err := relay.subscribe(testTopic) - defer sub.Cancel() + subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + require.NoError(t, err) require.Equal(t, relay.IsSubscribed(testTopic), true) @@ -47,9 +53,8 @@ func TestWakuRelay(t *testing.T) { go func() { defer cancel() - _, err := sub.Next(ctx) - require.NoError(t, err) - + msg := <-subs[0].Ch + fmt.Println("msg received ", msg) }() msg := &pb.WakuMessage{ @@ -63,9 +68,8 @@ func TestWakuRelay(t *testing.T) { time.Sleep(2 * time.Second) - err = relay.Unsubscribe(ctx, testTopic) + err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic)) require.NoError(t, err) - <-ctx.Done() } @@ -74,9 +78,12 @@ func createRelayNode(t *testing.T) (host.Host, *WakuRelay) { require.NoError(t, err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - - relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcaster := NewBroadcaster(10) + relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) + err = bcaster.Start(context.Background()) + require.NoError(t, err) + return host, relay } @@ -102,7 +109,7 @@ func TestGossipsubScore(t *testing.T) { require.NoError(t, err) } - sub, err := relay[i].subscribe(testTopic) + sub, err := relay[i].subscribeToPubsubTopic(testTopic) require.NoError(t, err) go func() { for { @@ -160,3 +167,147 @@ func TestMsgID(t *testing.T) { require.Equal(t, expectedMsgIDBytes, []byte(msgID)) } + +func waitForTimeout(t *testing.T, ch chan *protocol.Envelope) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case env, ok := <-ch: + if ok { + t.Error("should not receive another message with payload", string(env.Message().Payload)) + } + case <-time.After(2 * time.Second): + // Timeout elapsed, all good + } + }() + + wg.Wait() +} + +func waitForMsg(t *testing.T, ch chan *protocol.Envelope, cTopicExpected string) *sync.WaitGroup { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + select { + case env := <-ch: + fmt.Println("msg received", env) + require.Equal(t, cTopicExpected, env.Message().GetContentTopic()) + case <-time.After(5 * time.Second): + t.Error("Message timeout") + } + }() + + return &wg +} + +func TestWakuRelayAutoShard(t *testing.T) { + testcTopic := "/toychat/2/huilong/proto" + testcTopic1 := "/toychat/1/huilong/proto" + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster := NewBroadcaster(10) + relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay.SetHost(host) + err = relay.Start(context.Background()) + require.NoError(t, err) + + err = bcaster.Start(context.Background()) + require.NoError(t, err) + defer relay.Stop() + defer bcaster.Stop() + + //Create a contentTopic level subscription + subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic)) + require.NoError(t, err) + require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true) + + sub, err := relay.GetSubscription(testcTopic) + require.NoError(t, err) + _, ok := sub.contentFilter.ContentTopics[testcTopic] + require.Equal(t, true, ok) + + _, err = relay.GetSubscription(testcTopic1) + require.Error(t, err) + + topics := relay.Topics() + require.Equal(t, 1, len(topics)) + require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0]) + + ctx, cancel := context.WithCancel(context.Background()) + bytesToSend := []byte{1} + defer cancel() + + //Create a pubSub level subscription + subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) + require.NoError(t, err) + + msg := &pb.WakuMessage{ + Payload: bytesToSend, + Version: 0, + ContentTopic: testcTopic, + Timestamp: 0, + } + _, err = relay.Publish(context.Background(), msg) + require.NoError(t, err) + + wg := waitForMsg(t, subs[0].Ch, testcTopic) + wg.Wait() + + wg = waitForMsg(t, subs1[0].Ch, testcTopic) + wg.Wait() + + //Test publishing to different content-topic + + msg1 := &pb.WakuMessage{ + Payload: bytesToSend, + Version: 0, + ContentTopic: testcTopic1, + Timestamp: 0, + } + + _, err = relay.PublishToTopic(context.Background(), msg1, subs[0].contentFilter.PubsubTopic) + require.NoError(t, err) + + wg = waitForMsg(t, subs1[0].Ch, testcTopic1) + wg.Wait() + + //Should not receive message as subscription is for a different cTopic. + waitForTimeout(t, subs[0].Ch) + err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic)) + require.NoError(t, err) + _, err = relay.GetSubscription(testcTopic) + require.Error(t, err) + _, err = relay.GetSubscription(testcTopic1) + require.Error(t, err) + + topics = relay.Topics() + require.Equal(t, 1, len(topics)) + require.Equal(t, subs[0].contentFilter.PubsubTopic, topics[0]) + wg2 := waitForMsg(t, subs1[0].Ch, testcTopic1) + + msg2 := &pb.WakuMessage{ + Payload: bytesToSend, + Version: 0, + ContentTopic: testcTopic1, + Timestamp: 1, + } + + _, err = relay.PublishToTopic(context.Background(), msg2, subs[0].contentFilter.PubsubTopic) + require.NoError(t, err) + wg2.Wait() + + err = relay.Unsubscribe(ctx, protocol.NewContentFilter("", testcTopic)) + require.NoError(t, err) + + err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) + require.NoError(t, err) + +} diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index c1401be8..2037c5a3 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -32,9 +32,11 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { host, err := tests.MakeHost(context.Background(), port, rand.Reader) s.Require().NoError(err) - - relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcaster := relay.NewBroadcaster(1024) + relay := relay.NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) + err = bcaster.Start(context.Background()) + s.Require().NoError(err) err = relay.Start(context.Background()) s.Require().NoError(err) defer relay.Stop() diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index dea6e0e3..07e4f5b2 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -47,7 +47,8 @@ func TestResume(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx, relay.NoopSubscription()) + sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + err = s1.Start(ctx, sub) require.NoError(t, err) defer s1.Stop() @@ -69,7 +70,9 @@ func TestResume(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() @@ -107,7 +110,9 @@ func TestResumeWithListOfPeers(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx, relay.NoopSubscription()) + sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s1.Start(ctx, sub) require.NoError(t, err) defer s1.Stop() @@ -121,7 +126,9 @@ func TestResumeWithListOfPeers(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() @@ -148,7 +155,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx, relay.NoopSubscription()) + sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s1.Start(ctx, sub) require.NoError(t, err) defer s1.Stop() @@ -162,7 +171,9 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 079e1103..a19b8a42 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -52,7 +52,7 @@ type WakuStore struct { cancel context.CancelFunc timesource timesource.Timesource metrics Metrics - MsgC relay.Subscription + MsgC *relay.Subscription wg *sync.WaitGroup log *zap.Logger diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 17678a06..afb5c5a4 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -85,7 +85,7 @@ type MessageProvider interface { type Store interface { SetHost(h host.Host) - Start(context.Context, relay.Subscription) error + Start(context.Context, *relay.Subscription) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) @@ -104,7 +104,7 @@ func (store *WakuStore) SetHost(h host.Host) { } // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider -func (store *WakuStore) Start(ctx context.Context, sub relay.Subscription) error { +func (store *WakuStore) Start(ctx context.Context, sub *relay.Subscription) error { if store.started { return nil } diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 690861c9..9929f6e9 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -17,6 +17,19 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) +// SimulateSubscription creates a subscription for a list of envelopes +func SimulateSubscription(msgs []*protocol.Envelope) *relay.Subscription { + ch := make(chan *protocol.Envelope, len(msgs)) + for _, msg := range msgs { + ch <- msg + } + close(ch) + return &relay.Subscription{ + Unsubscribe: func() {}, + Ch: ch, + } +} + func TestWakuStoreProtocolQuery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -38,7 +51,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { } // Simulate a message has been received via relay protocol - sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) + sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) err = s1.Start(ctx, sub) require.NoError(t, err) defer s1.Stop() @@ -47,7 +60,9 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() @@ -88,7 +103,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { } // Simulate a message has been received via relay protocol - sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) + sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) err = s1.Start(ctx, sub) require.NoError(t, err) defer s1.Stop() @@ -127,7 +142,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { msg4 := tests.CreateWakuMessage(topic1, now+4) msg5 := tests.CreateWakuMessage(topic1, now+5) - sub := relay.ArraySubscription([]*protocol.Envelope{ + sub := SimulateSubscription([]*protocol.Envelope{ protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), @@ -146,7 +161,9 @@ func TestWakuStoreProtocolNext(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() @@ -202,7 +219,7 @@ func TestWakuStoreResult(t *testing.T) { msg4 := tests.CreateWakuMessage(topic1, now+4) msg5 := tests.CreateWakuMessage(topic1, now+5) - sub := relay.ArraySubscription([]*protocol.Envelope{ + sub := SimulateSubscription([]*protocol.Envelope{ protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), @@ -221,7 +238,9 @@ func TestWakuStoreResult(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop() @@ -296,7 +315,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { msg8 := tests.CreateWakuMessage(topic1, now+8) msg9 := tests.CreateWakuMessage(topic1, now+9) - sub := relay.ArraySubscription([]*protocol.Envelope{ + sub := SimulateSubscription([]*protocol.Envelope{ protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), @@ -320,7 +339,10 @@ func TestWakuStoreProtocolFind(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx, relay.NoopSubscription()) + + sub1 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub1) require.NoError(t, err) defer s2.Stop()