diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index 5f0f47d7..74ba35ce 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -338,6 +338,14 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error { } +func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error { + err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs)) + if err != nil { + return err + } + return nil +} + func (w *WakuNode) watchTopicShards(ctx context.Context) error { evtRelaySubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelaySubscribed)) if err != nil { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 4eefa7e2..c29a2b93 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -457,16 +457,25 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.filterLightNode.SetHost(host) + + err = w.setupENR(ctx, w.ListenAddresses()) + if err != nil { + return err + } + if w.opts.enableFilterLightNode { err := w.filterLightNode.Start(ctx) if err != nil { return err } - } - - err = w.setupENR(ctx, w.ListenAddresses()) - if err != nil { - return err + //TODO: setting this up temporarily to improve connectivity success for lightNode in status. + //This will have to be removed or changed with community sharding will be implemented. + if w.opts.shards != nil { + err = w.SetRelayShards(*w.opts.shards) + if err != nil { + return err + } + } } w.peerExchange.SetHost(host) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2e34ace7..445065de 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" @@ -53,6 +54,7 @@ type WakuNodeParameters struct { hostAddr *net.TCPAddr maxConnectionsPerIP int clusterID uint16 + shards *protocol.RelayShards dns4Domain string advertiseAddrs []multiaddr.Multiaddr multiAddr []multiaddr.Multiaddr @@ -317,6 +319,23 @@ func WithClusterID(clusterID uint16) WakuNodeOption { } } +func WithPubSubTopics(topics []string) WakuNodeOption { + return func(params *WakuNodeParameters) error { + rs, err := protocol.TopicsToRelayShards(topics...) + if err != nil { + return err + } + if len(rs) == 0 { + return nil + } + if rs[0].ClusterID != params.clusterID { + return errors.New("pubsubtopics have different clusterID than configured clusterID") + } + params.shards = &rs[0] //Only consider 0 as a node can only support 1 cluster as of now + return nil + } +} + // WithMaxConnectionsPerIP sets the max number of allowed peers from the same IP func WithMaxConnectionsPerIP(limit int) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index b8f3c0f3..7775b557 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -328,9 +328,9 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic - logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers)) + logger := message.Logger(wakuLP.log, params.pubsubTopic) - logger.Debug("publishing message") + logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers)) var wg sync.WaitGroup var responses []*pb.PushResponse for _, peerID := range params.selectedPeers { diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 6129d6c0..23a7e455 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -117,8 +117,6 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak request := &pb.WakuMetadataRequest{} request.ClusterId = clusterID request.Shards = shards - // TODO: remove with nwaku 0.28 deployment - request.ShardsDeprecated = shards // nolint: staticcheck writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) @@ -173,8 +171,6 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { } else { response.ClusterId = clusterID response.Shards = shards - // TODO: remove with nwaku 0.28 deployment - response.ShardsDeprecated = shards // nolint: staticcheck } err = writer.WriteMsg(response) @@ -245,14 +241,6 @@ func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { rClusterID := uint16(*response.ClusterId) var rs protocol.RelayShards - if _, err = wakuM.h.Peerstore().SupportsProtocols(peerID, relay.WakuRelayID_v200); err == nil { - wakuM.log.Debug("light peer only checking clusterID") - if rClusterID != wakuM.clusterID { - wakuM.disconnectPeer(peerID, errors.New("different clusterID reported")) - } - return - } - wakuM.log.Debug("relay peer checking cluster and shards") var rShardIDs []uint16 @@ -261,9 +249,12 @@ func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { rShardIDs = append(rShardIDs, uint16(i)) } } else { - // TODO: remove with nwaku 0.28 deployment - for _, i := range response.ShardsDeprecated { // nolint: staticcheck - rShardIDs = append(rShardIDs, uint16(i)) + if proto, err := wakuM.h.Peerstore().FirstSupportedProtocol(peerID, relay.WakuRelayID_v200); err == nil && proto == "" { + wakuM.log.Debug("light peer only checking clusterID") + if rClusterID != wakuM.clusterID { + wakuM.disconnectPeer(peerID, errors.New("different clusterID reported")) + } + return } } wakuM.log.Debug("getting remote cluster and shards")