diff --git a/VERSION b/VERSION index 7d07dbba4..d3c4350bd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.137.2 +0.138.0 diff --git a/go.mod b/go.mod index 62bfd60a2..05bbc8e33 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 github.com/ladydascalie/currency v1.6.0 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 - github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567 + github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 ) diff --git a/go.sum b/go.sum index 61a1df4fb..e55a9dd85 100644 --- a/go.sum +++ b/go.sum @@ -2100,8 +2100,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= -github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567 h1:8r7Y1hOmtcPaKKSDT0RaJCNG4HWDZ11a4/tyfbPE260= -github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567/go.mod h1:Uz6WhNbCtbM8fSr0wb8apqhAPQYKvOPoyaGOHdw9DkU= +github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483 h1:WB7CnxOpd99PxPE+mpNC4y2sdwDE263O6qgiDyRIYjY= +github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483/go.mod h1:Uz6WhNbCtbM8fSr0wb8apqhAPQYKvOPoyaGOHdw9DkU= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk= github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o= diff --git a/vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go b/vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go index 754fb6c36..80b214d71 100644 --- a/vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go +++ b/vendor/github.com/waku-org/go-waku/waku/persistence/db_key.go @@ -4,7 +4,7 @@ import ( "encoding/binary" "errors" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/hash" ) const ( @@ -35,7 +35,7 @@ func (k *DBKey) Bytes() []byte { // NewDBKey creates a new DBKey with the given values. func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey { - pubSubHash := utils.SHA256([]byte(pubsubTopic)) + pubSubHash := hash.SHA256([]byte(pubsubTopic)) var k DBKey k.raw = make([]byte, DBKeyLength) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/hash.go b/vendor/github.com/waku-org/go-waku/waku/v2/hash/hash.go similarity index 65% rename from vendor/github.com/waku-org/go-waku/waku/v2/utils/hash.go rename to vendor/github.com/waku-org/go-waku/waku/v2/hash/hash.go index 4d27bc08d..dc6462bd9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/utils/hash.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/hash/hash.go @@ -1,4 +1,4 @@ -package utils +package hash import ( "crypto/sha256" @@ -10,16 +10,15 @@ var sha256Pool = sync.Pool{New: func() interface{} { return sha256.New() }} -func SHA256(data []byte) []byte { +func SHA256(data ...[]byte) []byte { h, ok := sha256Pool.Get().(hash.Hash) if !ok { h = sha256.New() } defer sha256Pool.Put(h) h.Reset() - - var result [32]byte - h.Write(data) - h.Sum(result[:0]) - return result[:] + for i := range data { + h.Write(data[i]) + } + return h.Sum(nil) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index e4e3cd651..3e9671ed0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) - w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) @@ -560,7 +560,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { return errors.New("cannot publish message, relay and lightpush are disabled") } - hash, _, _ := msg.Hash() + hash := msg.Hash(relay.DefaultWakuTopic) err := try.Do(func(attempt int) (bool, error) { var err error diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index ae59ec9ec..c164056e5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -26,6 +26,7 @@ import ( ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/filterv2" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -68,6 +69,7 @@ type WakuNodeParameters struct { enableFilterV2LightNode bool enableFilterV2FullNode bool filterOpts []filter.Option + filterV2Opts []filterv2.Option wOpts []pubsub.Option minRelayPeersToPublish int @@ -335,10 +337,10 @@ func WithWakuFilterV2LightNode() WakuNodeOption { // WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality. // This WakuNodeOption accepts a list of WakuFilter options to setup the protocol -func WithWakuFilterV2FullNode(filterOpts ...filter.Option) WakuNodeOption { +func WithWakuFilterV2FullNode(filterOpts ...filterv2.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilterV2FullNode = true - params.filterOpts = filterOpts + params.filterV2Opts = filterOpts return nil } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/envelope.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/envelope.go index 469915ed7..e43db035b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/envelope.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/envelope.go @@ -1,9 +1,9 @@ package protocol import ( + "github.com/waku-org/go-waku/waku/v2/hash" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) // Envelope contains information about the pubsub topic of a WakuMessage @@ -11,7 +11,6 @@ import ( // protobuffer type Envelope struct { msg *wpb.WakuMessage - size int hash []byte index *pb.Index } @@ -20,11 +19,10 @@ type Envelope struct { // It's used as a way to know to which Pubsub topic belongs a WakuMessage // as well as generating a hash based on the bytes that compose the message func NewEnvelope(msg *wpb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope { - messageHash, dataLen, _ := msg.Hash() - hash := utils.SHA256(append([]byte(msg.ContentTopic), msg.Payload...)) + messageHash := msg.Hash(pubSubTopic) + hash := hash.SHA256([]byte(msg.ContentTopic), msg.Payload) return &Envelope{ msg: msg, - size: dataLen, hash: messageHash, index: &pb.Index{ Digest: hash[:], @@ -50,11 +48,6 @@ func (e *Envelope) Hash() []byte { return e.hash } -// Size returns the byte size of the WakuMessage -func (e *Envelope) Size() int { - return e.size -} - func (env *Envelope) Index() *pb.Index { return env.index } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/common.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/common.go new file mode 100644 index 000000000..50046f636 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/common.go @@ -0,0 +1,4 @@ +package filterv2 + +const DefaultMaxSubscriptions = 1000 +const MaxCriteriaPerSubscription = 1000 diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/options.go index 56d582ac5..104b84a80 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/options.go @@ -2,6 +2,7 @@ package filterv2 import ( "context" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -25,10 +26,23 @@ type ( log *zap.Logger } + FilterParameters struct { + Timeout time.Duration + MaxSubscribers int + } + + Option func(*FilterParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) FilterUnsubscribeOption func(*FilterUnsubscribeParameters) ) +func WithTimeout(timeout time.Duration) Option { + return func(params *FilterParameters) { + params.Timeout = timeout + } +} + func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.selectedPeer = p @@ -112,3 +126,16 @@ func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { AutomaticRequestId(), } } + +func WithMaxSubscribers(maxSubscribers int) Option { + return func(params *FilterParameters) { + params.MaxSubscribers = maxSubscribers + } +} + +func DefaultOptions() []Option { + return []Option{ + WithTimeout(24 * time.Hour), + WithMaxSubscribers(DefaultMaxSubscriptions), + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/server.go index 761b52e0c..23d00f85b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/server.go @@ -16,7 +16,6 @@ import ( v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/tag" @@ -27,6 +26,8 @@ import ( // allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") +const peerHasNoSubscription = "peer has no subscriptions" + type ( WakuFilterFull struct { cancel context.CancelFunc @@ -36,16 +37,18 @@ type ( log *zap.Logger subscriptions *SubscribersMap + + maxSubscriptions int } ) // NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull { +func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull { wf := new(WakuFilterFull) wf.log = log.Named("filterv2-fullnode") - params := new(filter.FilterParameters) - optList := filter.DefaultOptions() + params := new(FilterParameters) + optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { opt(params) @@ -54,6 +57,7 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc wf.wg = &sync.WaitGroup{} wf.h = host wf.subscriptions = NewSubscribersMap(params.Timeout) + wf.maxSubscriptions = params.MaxSubscribers return wf } @@ -138,7 +142,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb if exists { reply(s, logger, request, http.StatusOK) } else { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } } @@ -153,8 +157,25 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques return } + if wf.subscriptions.Count() >= wf.maxSubscriptions { + reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") + return + } + peerID := s.Conn().RemotePeer() + if totalSubs, exists := wf.subscriptions.Get(peerID); exists { + ctTotal := 0 + for _, contentTopicSet := range totalSubs { + ctTotal += len(contentTopicSet) + } + + if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription { + reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria") + return + } + } + wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics) reply(s, logger, request, http.StatusOK) @@ -173,7 +194,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) if err != nil { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { reply(s, logger, request, http.StatusOK) } @@ -182,7 +203,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer()) if err != nil { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { reply(s, logger, request, http.StatusOK) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/subscribers_map.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/subscribers_map.go index 767056f55..04e65cb3f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/subscribers_map.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filterv2/subscribers_map.go @@ -161,6 +161,13 @@ func (sub *SubscribersMap) RemoveAll() { sub.items = make(map[peer.ID]PubsubTopics) } +func (sub *SubscribersMap) Count() int { + sub.RLock() + defer sub.RUnlock() + + return len(sub.items) +} + func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { c := make(chan peer.ID) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 6ef8186ac..7ecd7a3eb 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -216,7 +216,7 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa } if response.IsSuccess { - hash, _, _ := message.Hash() + hash := message.Hash(topic) wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash)) return hash, nil } else { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go index 4e19483fe..0459a473f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/utils.go @@ -1,23 +1,8 @@ package pb -import ( - "crypto/sha256" - - proto "google.golang.org/protobuf/proto" -) +import "github.com/waku-org/go-waku/waku/v2/hash" // Hash calculates the hash of a waku message -func (msg *WakuMessage) Hash() ([]byte, int, error) { - out, err := proto.Marshal(msg) - if err != nil { - return nil, 0, err - } - - return Hash(out), len(out), nil -} - -// Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm -func Hash(data []byte) []byte { - hash := sha256.Sum256(data) - return hash[:] +func (msg *WakuMessage) Hash(pubsubTopic string) []byte { + return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.pb.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.pb.go index ec6d6a182..9cd7502f6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.pb.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.pb.go @@ -124,6 +124,7 @@ type WakuMessage struct { ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"` Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Meta []byte `protobuf:"bytes,11,opt,name=meta,proto3" json:"meta,omitempty"` RateLimitProof *RateLimitProof `protobuf:"bytes,21,opt,name=rate_limit_proof,json=rateLimitProof,proto3" json:"rate_limit_proof,omitempty"` Ephemeral bool `protobuf:"varint,31,opt,name=ephemeral,proto3" json:"ephemeral,omitempty"` } @@ -188,6 +189,13 @@ func (x *WakuMessage) GetTimestamp() int64 { return 0 } +func (x *WakuMessage) GetMeta() []byte { + if x != nil { + return x.Meta + } + return nil +} + func (x *WakuMessage) GetRateLimitProof() *RateLimitProof { if x != nil { return x.RateLimitProof @@ -220,7 +228,7 @@ var file_waku_message_proto_rawDesc = []byte{ 0x75, 0x6c, 0x6c, 0x69, 0x66, 0x69, 0x65, 0x72, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x6c, 0x6e, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x6c, 0x6e, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x22, - 0xdf, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0xf3, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, @@ -228,13 +236,14 @@ var file_waku_message_proto_rawDesc = []byte{ 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x12, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3c, 0x0a, 0x10, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, - 0x6d, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x15, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x50, 0x72, - 0x6f, 0x6f, 0x66, 0x52, 0x0e, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x50, 0x72, - 0x6f, 0x6f, 0x66, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d, 0x65, 0x72, 0x61, 0x6c, - 0x18, 0x1f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d, 0x65, 0x72, 0x61, - 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x12, 0x3c, 0x0a, 0x10, 0x72, 0x61, 0x74, + 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x15, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, + 0x69, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x0e, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, + 0x69, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d, + 0x65, 0x72, 0x61, 0x6c, 0x18, 0x1f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x70, 0x68, 0x65, + 0x6d, 0x65, 0x72, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.proto b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.proto index 8baade225..d8a650ea0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.proto +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/pb/waku_message.proto @@ -17,6 +17,7 @@ message WakuMessage { string contentTopic = 2; uint32 version = 3; sint64 timestamp = 10; + bytes meta = 11; RateLimitProof rate_limit_proof = 21; bool ephemeral = 31; } \ No newline at end of file diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index ccc4c0279..a62c13045 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -18,11 +18,11 @@ import ( pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/waku-org/go-waku/logging" v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/hash" "github.com/waku-org/go-waku/waku/v2/metrics" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" ) const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") @@ -52,7 +52,7 @@ type WakuRelay struct { } func msgIdFn(pmsg *pubsub_pb.Message) string { - return string(utils.SHA256(pmsg.Data)) + return string(hash.SHA256(pmsg.Data)) } // NewWakuRelay returns a new instance of a WakuRelay struct @@ -190,7 +190,7 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, err } - hash := pb.Hash(out) + hash := message.Hash(topic) w.log.Debug("waku.relay published", zap.String("hash", hex.EncodeToString(hash))) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go index 7a800540c..0962b73ab 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go @@ -314,13 +314,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR return nil, errors.New("invalid cursor") } - var messageIDs [][]byte - for _, m := range response.Messages { - messageID, _, _ := m.Hash() - messageIDs = append(messageIDs, messageID) - } - store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs)) - result := &Result{ store: store, Messages: response.Messages, diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/topic.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/topic.go index 2c9aba4ff..b5f6a7fb1 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/topic.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/topic.go @@ -3,11 +3,22 @@ package protocol import ( "errors" "fmt" + "runtime/debug" "strconv" "strings" ) +const Waku2PubsubTopicPrefix = "/waku/2" +const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs" + var ErrInvalidFormat = errors.New("invalid format") +var ErrInvalidStructure = errors.New("invalid topic structure") +var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix) +var ErrMissingTopicName = errors.New("missing topic-name") +var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix) +var ErrMissingClusterIndex = errors.New("missing shard_cluster_index") +var ErrMissingShardNumber = errors.New("missing shard_number") +var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") type ContentTopic struct { ApplicationName string @@ -54,38 +65,156 @@ func StringToContentTopic(s string) (ContentTopic, error) { }, nil } -type PubsubTopic struct { - Name string - Encoding string +type NamespacedPubsubTopicKind int + +const ( + StaticSharding NamespacedPubsubTopicKind = iota + NamedSharding +) + +type ShardedPubsubTopic interface { + String() string + Kind() NamespacedPubsubTopicKind + Equal(ShardedPubsubTopic) bool } -func (t PubsubTopic) String() string { - return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding) +type NamedShardingPubsubTopic struct { + ShardedPubsubTopic + kind NamespacedPubsubTopicKind + name string } -func DefaultPubsubTopic() PubsubTopic { - return NewPubsubTopic("default-waku", "proto") -} - -func NewPubsubTopic(name string, encoding string) PubsubTopic { - return PubsubTopic{ - Name: name, - Encoding: encoding, +func NewNamedShardingPubsubTopic(name string) ShardedPubsubTopic { + return NamedShardingPubsubTopic{ + kind: NamedSharding, + name: name, } } -func (t PubsubTopic) Equal(t2 PubsubTopic) bool { - return t.Name == t2.Name && t.Encoding == t2.Encoding +func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { + return n.kind } -func StringToPubsubTopic(s string) (PubsubTopic, error) { - p := strings.Split(s, "/") - if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" { - return PubsubTopic{}, ErrInvalidFormat +func (n NamedShardingPubsubTopic) Name() string { + return n.name +} + +func (s NamedShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool { + return s.String() == t2.String() +} + +func (n NamedShardingPubsubTopic) String() string { + return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name) +} + +func (s *NamedShardingPubsubTopic) Parse(topic string) error { + if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) { + return ErrInvalidTopicPrefix } - return PubsubTopic{ - Name: p[3], - Encoding: p[4], - }, nil + topicName := topic[8:] + if len(topicName) == 0 { + return ErrMissingTopicName + } + + s.kind = NamedSharding + s.name = topicName + + return nil +} + +type StaticShardingPubsubTopic struct { + ShardedPubsubTopic + kind NamespacedPubsubTopicKind + cluster uint16 + shard uint16 +} + +func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) ShardedPubsubTopic { + return StaticShardingPubsubTopic{ + kind: StaticSharding, + cluster: cluster, + shard: shard, + } +} + +func (n StaticShardingPubsubTopic) Cluster() uint16 { + return n.cluster +} + +func (n StaticShardingPubsubTopic) Shard() uint16 { + return n.shard +} + +func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { + return n.kind +} + +func (s StaticShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool { + return s.String() == t2.String() +} + +func (n StaticShardingPubsubTopic) String() string { + return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard) +} + +func (s *StaticShardingPubsubTopic) Parse(topic string) error { + if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { + fmt.Println(topic, StaticShardingPubsubTopicPrefix) + return ErrInvalidShardedTopicPrefix + } + + parts := strings.Split(topic[11:], "/") + if len(parts) != 2 { + return ErrInvalidStructure + } + + clusterPart := parts[0] + if len(clusterPart) == 0 { + return ErrMissingClusterIndex + } + + clusterInt, err := strconv.ParseUint(clusterPart, 10, 16) + if err != nil { + return ErrInvalidNumberFormat + } + + shardPart := parts[1] + if len(shardPart) == 0 { + return ErrMissingShardNumber + } + + shardInt, err := strconv.ParseUint(shardPart, 10, 16) + if err != nil { + return ErrInvalidNumberFormat + } + + s.shard = uint16(shardInt) + s.cluster = uint16(clusterInt) + s.kind = StaticSharding + + return nil +} + +func ToShardedPubsubTopic(topic string) (ShardedPubsubTopic, error) { + if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { + s := StaticShardingPubsubTopic{} + err := s.Parse(topic) + if err != nil { + return nil, err + } + return s, nil + } else { + debug.PrintStack() + s := NamedShardingPubsubTopic{} + err := s.Parse(topic) + if err != nil { + return nil, err + } + return s, nil + } +} + +func DefaultPubsubTopic() ShardedPubsubTopic { + return NewNamedShardingPubsubTopic("default-waku/proto") } diff --git a/vendor/modules.txt b/vendor/modules.txt index bce322d9e..a913eeb8b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -971,7 +971,7 @@ github.com/vacp2p/mvds/transport github.com/waku-org/go-discover/discover github.com/waku-org/go-discover/discover/v4wire github.com/waku-org/go-discover/discover/v5wire -# github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567 +# github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483 ## explicit; go 1.19 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/waku/persistence @@ -979,6 +979,7 @@ github.com/waku-org/go-waku/waku/try github.com/waku-org/go-waku/waku/v2 github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc +github.com/waku-org/go-waku/waku/v2/hash github.com/waku-org/go-waku/waku/v2/metrics github.com/waku-org/go-waku/waku/v2/node github.com/waku-org/go-waku/waku/v2/payload diff --git a/wakuv2/api.go b/wakuv2/api.go index 48bea7888..fa024e24f 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -253,6 +253,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt Version: version, ContentTopic: req.Topic.ContentTopic(), Timestamp: api.w.timestamp(), + Meta: []byte{}, // TODO: empty for now. Once we use Waku Archive v2, we should deprecate the timestamp and use an ULID here Ephemeral: req.Ephemeral, } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index eee352496..17838549f 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -116,7 +116,7 @@ type Waku struct { bandwidthCounter *metrics.BandwidthCounter - sendQueue chan *pb.WakuMessage + sendQueue chan *protocol.Envelope msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded quit chan struct{} // Channel used for graceful exit wg sync.WaitGroup @@ -193,7 +193,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), expirations: make(map[uint32]mapset.Set), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), - sendQueue: make(chan *pb.WakuMessage, 1000), + sendQueue: make(chan *protocol.Envelope, 1000), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), quit: make(chan struct{}), connectionChanged: make(chan struct{}), @@ -1093,26 +1093,20 @@ func (w *Waku) UnsubscribeMany(ids []string) error { func (w *Waku) broadcast() { for { select { - case msg := <-w.sendQueue: - - hash, _, err := msg.Hash() - if err != nil { - w.logger.Error("invalid message", zap.Error(err)) - continue - } - + case envelope := <-w.sendQueue: + var err error if w.settings.LightClient { - w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(hash))) - _, err = w.node.Lightpush().Publish(context.Background(), msg) + w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(envelope.Hash()))) + _, err = w.node.Lightpush().Publish(context.Background(), envelope.Message()) } else { - w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(hash))) - _, err = w.node.Relay().Publish(context.Background(), msg) + w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(envelope.Hash()))) + _, err = w.node.Relay().Publish(context.Background(), envelope.Message()) } if err != nil { - w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(hash)), zap.Error(err)) + w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.Error(err)) w.envelopeFeed.Send(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(hash), + Hash: gethcommon.BytesToHash(envelope.Hash()), Event: common.EventEnvelopeExpired, }) @@ -1121,7 +1115,7 @@ func (w *Waku) broadcast() { event := common.EnvelopeEvent{ Event: common.EventEnvelopeSent, - Hash: gethcommon.BytesToHash(hash), + Hash: gethcommon.BytesToHash(envelope.Hash()), } w.SendEnvelopeEvent(event) @@ -1135,24 +1129,20 @@ func (w *Waku) broadcast() { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { - hash, _, err := msg.Hash() - if err != nil { - return nil, err - } + envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic) // TODO: once sharding is defined, use the correct pubsub topic - w.sendQueue <- msg + w.sendQueue <- envelope w.poolMu.Lock() - _, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)] + _, alreadyCached := w.envelopes[gethcommon.BytesToHash(envelope.Hash())] w.poolMu.Unlock() if !alreadyCached { - envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic) recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType) w.postEvent(recvMessage) // notify the local node about the new message w.addEnvelope(recvMessage) } - return hash, nil + return envelope.Hash(), nil } func (w *Waku) query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (*store.Result, error) { @@ -1289,7 +1279,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) { } else { w.logger.Debug("cached w envelope", zap.String("envelopeHash", recvMessage.Hash().Hex())) common.EnvelopesCachedCounter.WithLabelValues("miss").Inc() - common.EnvelopesSizeMeter.Observe(float64(recvMessage.Envelope.Size())) + common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload))) w.postEvent(recvMessage) // notify the local node about the new message } return true, nil