From 73bcb2e78a3aa1c5a64d1067ec34588acef8979e Mon Sep 17 00:00:00 2001 From: harsh jain Date: Mon, 13 Nov 2023 19:17:43 +0700 Subject: [PATCH 1/2] feat: add dns discovery in lib (#884) --- cmd/waku/node.go | 59 +++++++++++---------------------------- examples/basic2/go.mod | 3 +- examples/basic2/go.sum | 1 + examples/filter2/go.mod | 3 +- examples/filter2/go.sum | 1 + examples/rln/go.mod | 3 +- examples/rln/go.sum | 1 + library/config.go | 42 +++++++++++++++------------- library/node.go | 14 ++++------ waku/v2/node/wakunode2.go | 39 ++++++++++++++++++++++++++ 10 files changed, 91 insertions(+), 75 deletions(-) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index a3bd6c4e..08de3e2f 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -19,12 +19,12 @@ import ( "github.com/urfave/cli/v2" dbutils "github.com/waku-org/go-waku/waku/persistence/utils" + "github.com/waku-org/go-waku/waku/v2/dnsdisc" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/p2p/enode" dssql "github.com/ipfs/go-ds-sql" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -44,7 +44,6 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" @@ -279,47 +278,6 @@ func Execute(options NodeOptions) error { nodeOpts = append(nodeOpts, node.WithLightPush()) } - var discoveredNodes []dnsdisc.DiscoveredNode - if options.DNSDiscovery.Enable { - if len(options.DNSDiscovery.URLs.Value()) != 0 { - for _, url := range options.DNSDiscovery.URLs.Value() { - logger.Info("attempting DNS discovery with ", zap.String("URL", url)) - nodes, err := dnsdisc.RetrieveNodes(ctx, url, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) - if err != nil { - logger.Warn("dns discovery error ", zap.Error(err)) - } else { - var discPeerInfo []peer.AddrInfo - for _, n := range nodes { - discPeerInfo = append(discPeerInfo, n.PeerInfo) - } - logger.Info("found dns entries ", zap.Any("nodes", discPeerInfo)) - discoveredNodes = append(discoveredNodes, nodes...) - } - } - } else { - return nonRecoverErrorMsg("DNS discovery URL is required") - } - } - - if options.DiscV5.Enable { - var bootnodes []*enode.Node - for _, addr := range options.DiscV5.Nodes.Value() { - bootnode, err := enode.Parse(enode.ValidSchemes, addr) - if err != nil { - logger.Fatal("parsing ENR", zap.Error(err)) - } - bootnodes = append(bootnodes, bootnode) - } - - for _, n := range discoveredNodes { - if n.ENR != nil { - bootnodes = append(bootnodes, n.ENR) - } - } - - nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate)) - } - if options.PeerExchange.Enable { nodeOpts = append(nodeOpts, node.WithPeerExchange()) } @@ -357,6 +315,21 @@ func Execute(options NodeOptions) error { } } + var discoveredNodes []dnsdisc.DiscoveredNode + if options.DNSDiscovery.Enable { + if len(options.DNSDiscovery.URLs.Value()) == 0 { + return nonRecoverErrorMsg("DNS discovery URL is required") + } + discoveredNodes = node.GetNodesFromDNSDiscovery(logger, ctx, options.DNSDiscovery.Nameserver, options.DNSDiscovery.URLs.Value()) + } + if options.DiscV5.Enable { + discv5Opts, err := node.GetDiscv5Option(discoveredNodes, options.DiscV5.Nodes.Value(), options.DiscV5.Port, options.DiscV5.AutoUpdate) + if err != nil { + logger.Fatal("parsing ENR", zap.Error(err)) + } + nodeOpts = append(nodeOpts, discv5Opts) + } + if err = wakuNode.Start(ctx); err != nil { return nonRecoverError(err) } diff --git a/examples/basic2/go.mod b/examples/basic2/go.mod index 9e244c56..954987ba 100644 --- a/examples/basic2/go.mod +++ b/examples/basic2/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.10.26 github.com/waku-org/go-waku v0.2.3-0.20221109195301-b2a5a68d28ba go.uber.org/zap v1.24.0 + google.golang.org/protobuf v1.31.0 ) require ( @@ -123,8 +124,8 @@ require ( golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.12.1-0.20230818130535-1517d1a3ba60 // indirect - google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/basic2/go.sum b/examples/basic2/go.sum index 2f151798..fc06c781 100644 --- a/examples/basic2/go.sum +++ b/examples/basic2/go.sum @@ -866,6 +866,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/examples/filter2/go.mod b/examples/filter2/go.mod index 1205e2ad..f0c2c609 100644 --- a/examples/filter2/go.mod +++ b/examples/filter2/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.10.26 github.com/ipfs/go-log/v2 v2.5.1 github.com/waku-org/go-waku v0.2.3-0.20221109195301-b2a5a68d28ba + google.golang.org/protobuf v1.31.0 ) require ( @@ -123,8 +124,8 @@ require ( golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.12.1-0.20230818130535-1517d1a3ba60 // indirect - google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index 2f151798..fc06c781 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -866,6 +866,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/examples/rln/go.mod b/examples/rln/go.mod index 3582c132..f4ce511a 100644 --- a/examples/rln/go.mod +++ b/examples/rln/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.10.26 github.com/waku-org/go-waku v0.2.3-0.20221109195301-b2a5a68d28ba go.uber.org/zap v1.24.0 + google.golang.org/protobuf v1.31.0 ) require ( @@ -123,8 +124,8 @@ require ( golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/tools v0.12.1-0.20230818130535-1517d1a3ba60 // indirect - google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/examples/rln/go.sum b/examples/rln/go.sum index 2f151798..fc06c781 100644 --- a/examples/rln/go.sum +++ b/examples/rln/go.sum @@ -866,6 +866,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/library/config.go b/library/config.go index cce1a108..22bd2883 100644 --- a/library/config.go +++ b/library/config.go @@ -10,26 +10,28 @@ import ( // WakuConfig contains all the configuration settings exposed to users of mobile and c libraries type WakuConfig struct { - Host *string `json:"host,omitempty"` - Port *int `json:"port,omitempty"` - AdvertiseAddress *string `json:"advertiseAddr,omitempty"` - NodeKey *string `json:"nodeKey,omitempty"` - LogLevel *string `json:"logLevel,omitempty"` - KeepAliveInterval *int `json:"keepAliveInterval,omitempty"` - EnableRelay *bool `json:"relay"` - RelayTopics []string `json:"relayTopics,omitempty"` - GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"` - EnableLegacyFilter *bool `json:"legacyFilter,omitempty"` - MinPeersToPublish *int `json:"minPeersToPublish,omitempty"` - EnableDiscV5 *bool `json:"discV5,omitempty"` - DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"` - DiscV5UDPPort *uint `json:"discV5UDPPort,omitempty"` - EnableStore *bool `json:"store,omitempty"` - DatabaseURL *string `json:"databaseURL,omitempty"` - RetentionMaxMessages *int `json:"storeRetentionMaxMessages,omitempty"` - RetentionTimeSeconds *int `json:"storeRetentionTimeSeconds,omitempty"` - DNS4DomainName string `json:"dns4DomainName,omitempty"` - Websockets *WebsocketConfig `json:"websockets,omitempty"` + Host *string `json:"host,omitempty"` + Port *int `json:"port,omitempty"` + AdvertiseAddress *string `json:"advertiseAddr,omitempty"` + NodeKey *string `json:"nodeKey,omitempty"` + LogLevel *string `json:"logLevel,omitempty"` + KeepAliveInterval *int `json:"keepAliveInterval,omitempty"` + EnableRelay *bool `json:"relay"` + RelayTopics []string `json:"relayTopics,omitempty"` + GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"` + EnableLegacyFilter *bool `json:"legacyFilter,omitempty"` + MinPeersToPublish *int `json:"minPeersToPublish,omitempty"` + DNSDiscoveryURLs []string `json:"dnsDiscoveryURLs,omitempty"` + DNSDiscoveryNameServer string `json:"dnsDiscoveryNameServer,omitempty"` + EnableDiscV5 *bool `json:"discV5,omitempty"` + DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"` + DiscV5UDPPort *uint `json:"discV5UDPPort,omitempty"` + EnableStore *bool `json:"store,omitempty"` + DatabaseURL *string `json:"databaseURL,omitempty"` + RetentionMaxMessages *int `json:"storeRetentionMaxMessages,omitempty"` + RetentionTimeSeconds *int `json:"storeRetentionTimeSeconds,omitempty"` + DNS4DomainName string `json:"dns4DomainName,omitempty"` + Websockets *WebsocketConfig `json:"websockets,omitempty"` } // WebsocketConfig contains all the settings required to setup websocket support in waku diff --git a/library/node.go b/library/node.go index eaacd8da..cc8959c2 100644 --- a/library/node.go +++ b/library/node.go @@ -15,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/p2p/enode" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" @@ -145,15 +144,12 @@ func NewNode(configJSON string) error { } if *config.EnableDiscV5 { - var bootnodes []*enode.Node - for _, addr := range config.DiscV5BootstrapNodes { - bootnode, err := enode.Parse(enode.ValidSchemes, addr) - if err != nil { - return err - } - bootnodes = append(bootnodes, bootnode) + discoveredNodes := node.GetNodesFromDNSDiscovery(utils.Logger(), context.TODO(), config.DNSDiscoveryNameServer, config.DNSDiscoveryURLs) + discv5Opts, err := node.GetDiscv5Option(discoveredNodes, config.DiscV5BootstrapNodes, *config.DiscV5UDPPort, true) + if err != nil { + return err } - opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true)) + opts = append(opts, discv5Opts) } wakuState.relayTopics = config.RelayTopics diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0bb611f7..5b4dbd50 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -30,6 +30,7 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/discv5" + "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/peermanager" wps "github.com/waku-org/go-waku/waku/v2/peerstore" wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol" @@ -928,3 +929,41 @@ func (w *WakuNode) findRelayNodes(ctx context.Context) { } } } + +func GetNodesFromDNSDiscovery(logger *zap.Logger, ctx context.Context, nameServer string, discoveryURLs []string) []dnsdisc.DiscoveredNode { + var discoveredNodes []dnsdisc.DiscoveredNode + for _, url := range discoveryURLs { + logger.Info("attempting DNS discovery with ", zap.String("URL", url)) + nodes, err := dnsdisc.RetrieveNodes(ctx, url, dnsdisc.WithNameserver(nameServer)) + if err != nil { + logger.Warn("dns discovery error ", zap.Error(err)) + } else { + var discPeerInfo []peer.AddrInfo + for _, n := range nodes { + discPeerInfo = append(discPeerInfo, n.PeerInfo) + } + logger.Info("found dns entries ", zap.Any("nodes", discPeerInfo)) + discoveredNodes = append(discoveredNodes, nodes...) + } + } + return discoveredNodes +} + +func GetDiscv5Option(dnsDiscoveredNodes []dnsdisc.DiscoveredNode, discv5Nodes []string, port uint, autoUpdate bool) (WakuNodeOption, error) { + var bootnodes []*enode.Node + for _, addr := range discv5Nodes { + bootnode, err := enode.Parse(enode.ValidSchemes, addr) + if err != nil { + return nil, err + } + bootnodes = append(bootnodes, bootnode) + } + + for _, n := range dnsDiscoveredNodes { + if n.ENR != nil { + bootnodes = append(bootnodes, n.ENR) + } + } + + return WithDiscoveryV5(port, bootnodes, autoUpdate), nil +} From a5ce5dfaa47b305efe460a84ebb7164180e2b8cb Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 14 Nov 2023 04:22:46 +0530 Subject: [PATCH 2/2] feat: update store client Query API for autosharding (#885) --- cmd/waku/server/rest/store.go | 52 ++++----- cmd/waku/server/rpc/store.go | 2 +- library/store.go | 2 +- waku/v2/node/wakunode2.go | 8 +- waku/v2/peermanager/peer_discovery.go | 18 +-- waku/v2/peermanager/peer_manager.go | 51 +++++++-- waku/v2/peermanager/peer_manager_test.go | 16 +-- waku/v2/peermanager/peer_selection.go | 38 ++++--- waku/v2/peerstore/waku_peer_store.go | 34 +++++- waku/v2/protocol/content_filter.go | 20 +--- waku/v2/protocol/filter/client.go | 2 +- .../filter/filter_proto_ident_test.go | 2 +- waku/v2/protocol/legacy_filter/waku_filter.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 2 +- waku/v2/protocol/shard.go | 23 ++++ waku/v2/protocol/store/waku_store_client.go | 107 ++++++++++++++---- .../store/waku_store_protocol_test.go | 10 +- waku/v2/rendezvous/rendezvous.go | 2 +- waku/v2/service/common_discovery_service.go | 2 +- 19 files changed, 258 insertions(+), 135 deletions(-) diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index ce31e8c3..19f3e048 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -12,10 +12,8 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) type StoreService struct { @@ -57,24 +55,20 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { return s } -func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { +func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) { query := &store.Query{} var options []store.HistoryRequestOption - + var err error peerAddrStr := r.URL.Query().Get("peerAddr") - m, err := multiaddr.NewMultiaddr(peerAddrStr) - if err != nil { - return nil, nil, nil, err + var m multiaddr.Multiaddr + if peerAddrStr != "" { + m, err = multiaddr.NewMultiaddr(peerAddrStr) + if err != nil { + return nil, nil, err + } + options = append(options, store.WithPeerAddr(m)) } - - peerID, err := utils.GetPeerID(m) - if err != nil { - return nil, nil, nil, err - } - - options = append(options, store.WithPeer(peerID)) - - query.Topic = r.URL.Query().Get("pubsubTopic") + query.PubsubTopic = r.URL.Query().Get("pubsubTopic") contentTopics := r.URL.Query().Get("contentTopics") if contentTopics != "" { @@ -85,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if startTimeStr != "" { startTime, err := strconv.ParseInt(startTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.StartTime = &startTime } @@ -94,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if endTimeStr != "" { endTime, err := strconv.ParseInt(endTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.EndTime = &endTime } @@ -111,25 +105,25 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if senderTimeStr != "" { cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if storeTimeStr != "" { cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if digestStr != "" { cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } - cursor.PubsubTopic = query.Topic + cursor.PubsubTopic = query.PubsubTopic options = append(options, store.WithCursor(cursor)) } @@ -142,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if ascendingStr != "" { ascending, err = strconv.ParseBool(ascendingStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if pageSizeStr != "" { pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } options = append(options, store.WithPaging(ascending, pageSize)) } - return m, query, options, nil + return query, options, nil } func writeStoreError(w http.ResponseWriter, code int, err error) { @@ -190,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse { } func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { - peerAddr, query, options, err := getStoreParams(r) + query, options, err := getStoreParams(r) if err != nil { writeStoreError(w, http.StatusBadRequest, err) return @@ -199,12 +193,6 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - _, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics()) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return - } - result, err := d.node.Store().Query(ctx, *query, options...) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) diff --git a/cmd/waku/server/rpc/store.go b/cmd/waku/server/rpc/store.go index 84f3f64c..f39bb645 100644 --- a/cmd/waku/server/rpc/store.go +++ b/cmd/waku/server/rpc/store.go @@ -48,7 +48,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, res, err := s.node.Store().Query( req.Context(), store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentFilters, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/library/store.go b/library/store.go index e539e8be..9c8c8c3d 100644 --- a/library/store.go +++ b/library/store.go @@ -39,7 +39,7 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store. res, err := wakuState.node.Store().Query( ctx, store.Query{ - Topic: args.Topic, + PubsubTopic: args.Topic, ContentTopics: args.ContentTopics, StartTime: args.StartTime, EndTime: args.EndTime, diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5b4dbd50..a9f22657 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + if err != nil { + return "", err + } + return pData.AddrInfo.ID, nil } // AddDiscoveredPeer to add a discovered peer to the node peerStore @@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp ID: ID, Addrs: addrs, }, - PubSubTopics: pubsubTopics, + PubsubTopics: pubsubTopics, } w.peermanager.AddDiscoveredPeer(p, connectNow) } diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index 72ee3077..fd8b316b 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, return peers, nil } -func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { - shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) +func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) { + shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...) if err != nil { - pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) + pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err)) return } - if len(shardInfo) > 0 { - err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) - if err != nil { - pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + if len(shardsInfo) > 0 { + for _, shardInfo := range shardsInfo { + err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount) + if err != nil { + pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + } } } else { - pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) + pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics)) } } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index f309b008..c1169129 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { - pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) + pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) continue } //Connect to eligible peers. @@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } else { if shards != nil { - p.PubSubTopics = make([]string, 0) + p.PubsubTopics = make([]string, 0) topics := shards.Topics() for _, topic := range topics { topicStr := topic.String() - p.PubSubTopics = append(p.PubSubTopics, topicStr) + p.PubsubTopics = append(p.PubsubTopics, topicStr) } } else { pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) @@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { return } supportedProtos := []protocol.ID{} - if len(p.PubSubTopics) == 0 && p.ENR != nil { + if len(p.PubsubTopics) == 0 && p.ENR != nil { // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. supportedProtos = pm.processPeerENR(&p) } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig return nil } +func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData { + addrs := host.Peerstore().Addrs(peerID) + if len(addrs) == 0 { + //Addresses expired, remove peer from peerStore + host.Peerstore().RemovePeer(peerID) + return nil + } + return &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + }, + PubsubTopics: pubsubTopics, + } +} + // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { //Assuming all addresses have peerId info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { - return "", err + return nil, err } //Add Service peers to serviceSlots. @@ -433,12 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo } //Add to the peer-store - err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...) + err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...) if err != nil { - return "", err + return nil, err } - return info.ID, nil + pData := &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: info.ID, + Addrs: info.Addrs, + }, + PubsubTopics: pubsubTopics, + } + + return pData, nil +} + +// Connect establishes a connection to a peer. +func (pm *PeerManager) Connect(pData *service.PeerData) { + go pm.peerConnector.PushToChan(*pData) } // RemovePeer deletes peer from the peerStore after disconnecting it. diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 8d90f87c..5b289093 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) require.Equal(t, peerID, h4.ID()) - _, err = pm.SelectPeerByContentTopic(protocol1, "") + _, err = pm.SelectPeerByContentTopics(protocol1, []string{""}) require.Error(t, wakuproto.ErrInvalidFormat, err) } @@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) { _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) } @@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.NoError(t, err) //Discovery should fail for non-waku protocol - _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"}) require.Error(t, err) _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) @@ -299,7 +299,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { var enrField uint8 enrField |= (1 << 1) pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) - peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host2.ID()) @@ -307,7 +307,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) { enrField1 |= (1 << 3) pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) - peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) require.NoError(t, err) require.Equal(t, peerID, host1.ID()) diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index e785c177..72e1d062 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -20,12 +20,16 @@ import ( // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - return "", err +func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopics := []string{} + for _, cTopic := range contentTopics { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return "", err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopics: pubsubTopics, Proto: proto, SpecificPeers: specificPeers}) } // SelectRandomPeer is used to return a random peer that supports a given protocol. @@ -40,12 +44,12 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID // - which topics they track // - latency? - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) + peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopics, criteria.Ctx, criteria.SpecificPeers...) if err == nil { return peerID, nil } else if !errors.Is(err, ErrNoPeersAvailable) { pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), - zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) + zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) return "", err } @@ -54,34 +58,34 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID if err != nil { return "", err } - if criteria.PubsubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) + if len(criteria.PubsubTopics) > 0 { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } return selectRandomPeer(filteredPeers, pm.logger) } -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { var peerID peer.ID var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if pubsubTopic == "" { + if len(pubsubTopics) == 0 || (len(pubsubTopics) == 1 && pubsubTopics[0] == "") { return slot.getRandom() } else { //PubsubTopic based selection keys := make([]peer.ID, 0, len(slot.m)) for i := range slot.m { keys = append(keys, i) } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(pubsubTopics, keys...) peerID, err = selectRandomPeer(selectedPeers, pm.logger) if err == nil { return peerID, nil } else { - pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) + pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) //Trigger on-demand discovery for this topic and connect to peer immediately. //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) + pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1) //Try to fetch peers again. continue } @@ -98,7 +102,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, type PeerSelectionCriteria struct { SelectionType PeerSelection Proto protocol.ID - PubsubTopic string + PubsubTopics []string SpecificPeers peer.IDSlice Ctx context.Context } @@ -135,8 +139,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( criteria.Ctx = context.Background() } - if criteria.PubsubTopic != "" { - peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) } peers, err = pm.FilterPeersByProto(peers, criteria.Proto) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index b1b1ac4f..5495a577 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -59,6 +59,7 @@ type WakuPeerstore interface { RemovePubSubTopic(p peer.ID, topic string) error PubSubTopics(p peer.ID) ([]string, error) SetPubSubTopics(p peer.ID, topics []string) error + PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice } @@ -207,7 +208,38 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { return result.([]string), nil } -// PeersByPubSubTopic Returns list of peers by pubSubTopic +// PeersByPubSubTopic Returns list of peers that support list of pubSubTopics +// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore +func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice { + if specificPeers == nil { + specificPeers = ps.Peers() + } + var result peer.IDSlice + for _, p := range specificPeers { + topics, err := ps.PubSubTopics(p) + if err == nil { + //Convoluted and crazy logic to find subset of topics + // Could not find a better way to do it? + peerTopicMap := make(map[string]struct{}) + match := true + for _, topic := range topics { + peerTopicMap[topic] = struct{}{} + } + for _, topic := range pubSubTopics { + if _, ok := peerTopicMap[topic]; !ok { + match = false + break + } + } + if match { + result = append(result, p) + } + } //Note: skipping a peer in case of an error as there would be others available. + } + return result +} + +// PeersByPubSubTopic Returns list of peers that support a single pubSubTopic // If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice { if specificPeers == nil { diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index 7ea7f2a5..f09cf52b 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -52,23 +52,5 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool { // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { - pubSubTopicMap := make(map[string][]string) - - if contentFilter.PubsubTopic != "" { - pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() - } else { - //Parse the content-Topics to figure out shards. - for _, cTopicString := range contentFilter.ContentTopicsList() { - pTopicStr, err := GetPubSubTopicFromContentTopic(cTopicString) - if err != nil { - return nil, err - } - _, ok := pubSubTopicMap[pTopicStr] - if !ok { - pubSubTopicMap[pTopicStr] = []string{} - } - pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) - } - } - return pubSubTopicMap, nil + return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList()) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index e3c9af32..aa0c38d5 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -312,7 +312,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index d67c76bf..9168b03f 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -178,7 +178,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, - PubsubTopic: pubSubTopic, + PubsubTopics: []string{pubSubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 8a7b00b3..0d87f718 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -256,7 +256,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterID_v20beta1, - PubsubTopic: filter.Topic, + PubsubTopics: []string{filter.Topic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 10e78662..f1890ed9 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -254,7 +254,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: LightPushID_v20beta1, - PubsubTopic: params.pubsubTopic, + PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, Ctx: ctx, }, diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 3f4779d2..66ec5fdc 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -245,3 +245,26 @@ func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) { return pTopic.String(), nil } + +func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) { + + pubSubTopicMap := make(map[string][]string, 0) + + if pubsubTopic == "" { + //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? + for _, cTopic := range contentTopics { + pTopic, err := GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + _, ok := pubSubTopicMap[pTopic] + if !ok { + pubSubTopicMap[pTopic] = []string{} + } + pubSubTopicMap[pTopic] = append(pubSubTopicMap[pTopic], cTopic) + } + } else { + pubSubTopicMap[pubsubTopic] = append(pubSubTopicMap[pubsubTopic], contentTopics...) + } + return pubSubTopicMap, nil +} diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 49e80374..fb2d2e95 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -8,17 +8,19 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) type Query struct { - Topic string + PubsubTopic string ContentTopics []string StartTime *int64 EndTime *int64 @@ -82,6 +84,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice localQuery bool @@ -93,12 +96,31 @@ type HistoryRequestParameters struct { s *WakuStore } -type HistoryRequestOption func(*HistoryRequestParameters) +type HistoryRequestOption func(*HistoryRequestParameters) error -// WithPeer is an option used to specify the peerID to request the message history +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerId and peerAddr options are mutually exclusive") + } + return nil + } +} + +//WithPeerAddr is an option used to specify a peerAddress to request the message history. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. + +func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption { + return func(params *HistoryRequestParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -108,9 +130,10 @@ func WithPeer(p peer.ID) HistoryRequestOption { // from the node peerstore // Note: This option is avaiable only with peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -120,44 +143,50 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // from the node peerstore // Note: This option is avaiable only with peerManager func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } // WithRequestID is an option to set a specific request ID to be used when // creating a store request func WithRequestID(requestID []byte) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a store request func WithAutomaticRequestID() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } func WithCursor(c *pb.Index) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.cursor = c + return nil } } // WithPaging is an option used to specify the order and maximum number of records to return func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.asc = asc params.pageSize = pageSize + return nil } } func WithLocalQuery() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.localQuery = true + return nil } } @@ -253,28 +282,56 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { - opt(params) - } - if store.pm != nil && params.selectedPeer == "" { - var err error - params.selectedPeer, err = store.pm.SelectPeer( - peermanager.PeerSelectionCriteria{ - SelectionType: params.peerSelectionType, - Proto: StoreID_v20beta4, - PubsubTopic: query.Topic, - SpecificPeers: params.preferredPeers, - Ctx: ctx, - }, - ) + err := opt(params) if err != nil { return nil, err } } + if !params.localQuery { + pubsubTopics := []string{} + if query.PubsubTopic == "" { + for _, cTopic := range query.ContentTopics { + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(cTopic) + if err != nil { + return nil, err + } + pubsubTopics = append(pubsubTopics, pubsubTopic) + } + } else { + pubsubTopics = append(pubsubTopics, query.PubsubTopic) + } + + //Add Peer to peerstore. + if store.pm != nil && params.peerAddr != nil { + pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + if err != nil { + return nil, err + } + store.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if store.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = store.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, + PubsubTopics: pubsubTopics, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } + } + } + historyRequest := &pb.HistoryRPC{ RequestId: hex.EncodeToString(params.requestID), Query: &pb.HistoryQuery{ - PubsubTopic: query.Topic, + PubsubTopic: query.PubsubTopic, ContentFilters: []*pb.ContentFilter{}, StartTime: query.StartTime, EndTime: query.EndTime, diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 82365110..ed034b55 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -71,7 +71,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { require.NoError(t, err) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -110,7 +110,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { time.Sleep(100 * time.Millisecond) q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } response, err := s1.Query(ctx, q, WithLocalQuery()) @@ -167,7 +167,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -244,7 +244,7 @@ func TestWakuStoreResult(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, ContentTopics: []string{topic1}, } @@ -346,7 +346,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { defer s2.Stop() q := Query{ - Topic: pubsubTopic1, + PubsubTopic: pubsubTopic1, } fn := func(msg *pb.WakuMessage) (bool, error) { diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index a53a74a4..76c63ff5 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -107,7 +107,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string peer := service.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, - PubSubTopics: []string{namespace}, + PubsubTopics: []string{namespace}, } if !r.PushToChan(peer) { r.log.Error("could push to closed channel/context completed") diff --git a/waku/v2/service/common_discovery_service.go b/waku/v2/service/common_discovery_service.go index 72bf96f1..c22f18f1 100644 --- a/waku/v2/service/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -14,7 +14,7 @@ type PeerData struct { Origin wps.Origin AddrInfo peer.AddrInfo ENR *enode.Node - PubSubTopics []string + PubsubTopics []string } type CommonDiscoveryService struct {