feat: update go-waku to introduce meta field and deterministic hashing

This commit is contained in:
Richard Ramos 2023-03-08 11:05:46 -04:00 committed by RichΛrd
parent bac7eb08ca
commit cbb016ac00
22 changed files with 283 additions and 121 deletions

View File

@ -1 +1 @@
0.137.2
0.138.0

2
go.mod
View File

@ -80,7 +80,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ladydascalie/currency v1.6.0
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567
github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
)

4
go.sum
View File

@ -2100,8 +2100,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567 h1:8r7Y1hOmtcPaKKSDT0RaJCNG4HWDZ11a4/tyfbPE260=
github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567/go.mod h1:Uz6WhNbCtbM8fSr0wb8apqhAPQYKvOPoyaGOHdw9DkU=
github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483 h1:WB7CnxOpd99PxPE+mpNC4y2sdwDE263O6qgiDyRIYjY=
github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483/go.mod h1:Uz6WhNbCtbM8fSr0wb8apqhAPQYKvOPoyaGOHdw9DkU=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o=

View File

@ -4,7 +4,7 @@ import (
"encoding/binary"
"errors"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/hash"
)
const (
@ -35,7 +35,7 @@ func (k *DBKey) Bytes() []byte {
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey {
pubSubHash := utils.SHA256([]byte(pubsubTopic))
pubSubHash := hash.SHA256([]byte(pubsubTopic))
var k DBKey
k.raw = make([]byte, DBKeyLength)

View File

@ -1,4 +1,4 @@
package utils
package hash
import (
"crypto/sha256"
@ -10,16 +10,15 @@ var sha256Pool = sync.Pool{New: func() interface{} {
return sha256.New()
}}
func SHA256(data []byte) []byte {
func SHA256(data ...[]byte) []byte {
h, ok := sha256Pool.Get().(hash.Hash)
if !ok {
h = sha256.New()
}
defer sha256Pool.Put(h)
h.Reset()
var result [32]byte
h.Write(data)
h.Sum(result[:0])
return result[:]
for i := range data {
h.Write(data[i])
}
return h.Sum(nil)
}

View File

@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)
w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
@ -560,7 +560,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
return errors.New("cannot publish message, relay and lightpush are disabled")
}
hash, _, _ := msg.Hash()
hash := msg.Hash(relay.DefaultWakuTopic)
err := try.Do(func(attempt int) (bool, error) {
var err error

View File

@ -26,6 +26,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
@ -68,6 +69,7 @@ type WakuNodeParameters struct {
enableFilterV2LightNode bool
enableFilterV2FullNode bool
filterOpts []filter.Option
filterV2Opts []filterv2.Option
wOpts []pubsub.Option
minRelayPeersToPublish int
@ -335,10 +337,10 @@ func WithWakuFilterV2LightNode() WakuNodeOption {
// WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterV2FullNode(filterOpts ...filter.Option) WakuNodeOption {
func WithWakuFilterV2FullNode(filterOpts ...filterv2.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterV2FullNode = true
params.filterOpts = filterOpts
params.filterV2Opts = filterOpts
return nil
}
}

View File

@ -1,9 +1,9 @@
package protocol
import (
"github.com/waku-org/go-waku/waku/v2/hash"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// Envelope contains information about the pubsub topic of a WakuMessage
@ -11,7 +11,6 @@ import (
// protobuffer
type Envelope struct {
msg *wpb.WakuMessage
size int
hash []byte
index *pb.Index
}
@ -20,11 +19,10 @@ type Envelope struct {
// It's used as a way to know to which Pubsub topic belongs a WakuMessage
// as well as generating a hash based on the bytes that compose the message
func NewEnvelope(msg *wpb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope {
messageHash, dataLen, _ := msg.Hash()
hash := utils.SHA256(append([]byte(msg.ContentTopic), msg.Payload...))
messageHash := msg.Hash(pubSubTopic)
hash := hash.SHA256([]byte(msg.ContentTopic), msg.Payload)
return &Envelope{
msg: msg,
size: dataLen,
hash: messageHash,
index: &pb.Index{
Digest: hash[:],
@ -50,11 +48,6 @@ func (e *Envelope) Hash() []byte {
return e.hash
}
// Size returns the byte size of the WakuMessage
func (e *Envelope) Size() int {
return e.size
}
func (env *Envelope) Index() *pb.Index {
return env.index
}

View File

@ -0,0 +1,4 @@
package filterv2
const DefaultMaxSubscriptions = 1000
const MaxCriteriaPerSubscription = 1000

View File

@ -2,6 +2,7 @@ package filterv2
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@ -25,10 +26,23 @@ type (
log *zap.Logger
}
FilterParameters struct {
Timeout time.Duration
MaxSubscribers int
}
Option func(*FilterParameters)
FilterSubscribeOption func(*FilterSubscribeParameters)
FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
)
func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.Timeout = timeout
}
}
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
@ -112,3 +126,16 @@ func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
AutomaticRequestId(),
}
}
func WithMaxSubscribers(maxSubscribers int) Option {
return func(params *FilterParameters) {
params.MaxSubscribers = maxSubscribers
}
}
func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
WithMaxSubscribers(DefaultMaxSubscriptions),
}
}

View File

@ -16,7 +16,6 @@ import (
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag"
@ -27,6 +26,8 @@ import (
// allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria
const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1")
const peerHasNoSubscription = "peer has no subscriptions"
type (
WakuFilterFull struct {
cancel context.CancelFunc
@ -36,16 +37,18 @@ type (
log *zap.Logger
subscriptions *SubscribersMap
maxSubscriptions int
}
)
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull {
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull {
wf := new(WakuFilterFull)
wf.log = log.Named("filterv2-fullnode")
params := new(filter.FilterParameters)
optList := filter.DefaultOptions()
params := new(FilterParameters)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
@ -54,6 +57,7 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc
wf.wg = &sync.WaitGroup{}
wf.h = host
wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers
return wf
}
@ -138,7 +142,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb
if exists {
reply(s, logger, request, http.StatusOK)
} else {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
}
}
@ -153,8 +157,25 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques
return
}
if wf.subscriptions.Count() >= wf.maxSubscriptions {
reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
}
peerID := s.Conn().RemotePeer()
if totalSubs, exists := wf.subscriptions.Get(peerID); exists {
ctTotal := 0
for _, contentTopicSet := range totalSubs {
ctTotal += len(contentTopicSet)
}
if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription {
reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
return
}
}
wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics)
reply(s, logger, request, http.StatusOK)
@ -173,7 +194,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
if err != nil {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
}
@ -182,7 +203,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
}

View File

@ -161,6 +161,13 @@ func (sub *SubscribersMap) RemoveAll() {
sub.items = make(map[peer.ID]PubsubTopics)
}
func (sub *SubscribersMap) Count() int {
sub.RLock()
defer sub.RUnlock()
return len(sub.items)
}
func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID)

View File

@ -216,7 +216,7 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa
}
if response.IsSuccess {
hash, _, _ := message.Hash()
hash := message.Hash(topic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
return hash, nil
} else {

View File

@ -1,23 +1,8 @@
package pb
import (
"crypto/sha256"
proto "google.golang.org/protobuf/proto"
)
import "github.com/waku-org/go-waku/waku/v2/hash"
// Hash calculates the hash of a waku message
func (msg *WakuMessage) Hash() ([]byte, int, error) {
out, err := proto.Marshal(msg)
if err != nil {
return nil, 0, err
}
return Hash(out), len(out), nil
}
// Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm
func Hash(data []byte) []byte {
hash := sha256.Sum256(data)
return hash[:]
func (msg *WakuMessage) Hash(pubsubTopic string) []byte {
return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta)
}

View File

@ -124,6 +124,7 @@ type WakuMessage struct {
ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"`
Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Meta []byte `protobuf:"bytes,11,opt,name=meta,proto3" json:"meta,omitempty"`
RateLimitProof *RateLimitProof `protobuf:"bytes,21,opt,name=rate_limit_proof,json=rateLimitProof,proto3" json:"rate_limit_proof,omitempty"`
Ephemeral bool `protobuf:"varint,31,opt,name=ephemeral,proto3" json:"ephemeral,omitempty"`
}
@ -188,6 +189,13 @@ func (x *WakuMessage) GetTimestamp() int64 {
return 0
}
func (x *WakuMessage) GetMeta() []byte {
if x != nil {
return x.Meta
}
return nil
}
func (x *WakuMessage) GetRateLimitProof() *RateLimitProof {
if x != nil {
return x.RateLimitProof
@ -220,7 +228,7 @@ var file_waku_message_proto_rawDesc = []byte{
0x75, 0x6c, 0x6c, 0x69, 0x66, 0x69, 0x65, 0x72, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x6c, 0x6e, 0x5f,
0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x0d, 0x72, 0x6c, 0x6e, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x22,
0xdf, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12,
0xf3, 0x01, 0x0a, 0x0b, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12,
0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6e,
0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
@ -228,13 +236,14 @@ var file_waku_message_proto_rawDesc = []byte{
0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07,
0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x12, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65,
0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3c, 0x0a, 0x10, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69,
0x6d, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x15, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x12, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x50, 0x72,
0x6f, 0x6f, 0x66, 0x52, 0x0e, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x50, 0x72,
0x6f, 0x6f, 0x66, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d, 0x65, 0x72, 0x61, 0x6c,
0x18, 0x1f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d, 0x65, 0x72, 0x61,
0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x0b, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x12, 0x3c, 0x0a, 0x10, 0x72, 0x61, 0x74,
0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x15, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d,
0x69, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x0e, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d,
0x69, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x70, 0x68, 0x65, 0x6d,
0x65, 0x72, 0x61, 0x6c, 0x18, 0x1f, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x65, 0x70, 0x68, 0x65,
0x6d, 0x65, 0x72, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -17,6 +17,7 @@ message WakuMessage {
string contentTopic = 2;
uint32 version = 3;
sint64 timestamp = 10;
bytes meta = 11;
RateLimitProof rate_limit_proof = 21;
bool ephemeral = 31;
}

View File

@ -18,11 +18,11 @@ import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/hash"
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
@ -52,7 +52,7 @@ type WakuRelay struct {
}
func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(utils.SHA256(pmsg.Data))
return string(hash.SHA256(pmsg.Data))
}
// NewWakuRelay returns a new instance of a WakuRelay struct
@ -190,7 +190,7 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
return nil, err
}
hash := pb.Hash(out)
hash := message.Hash(topic)
w.log.Debug("waku.relay published", zap.String("hash", hex.EncodeToString(hash)))

View File

@ -314,13 +314,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
return nil, errors.New("invalid cursor")
}
var messageIDs [][]byte
for _, m := range response.Messages {
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs))
result := &Result{
store: store,
Messages: response.Messages,

View File

@ -3,11 +3,22 @@ package protocol
import (
"errors"
"fmt"
"runtime/debug"
"strconv"
"strings"
)
const Waku2PubsubTopicPrefix = "/waku/2"
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidStructure = errors.New("invalid topic structure")
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
type ContentTopic struct {
ApplicationName string
@ -54,38 +65,156 @@ func StringToContentTopic(s string) (ContentTopic, error) {
}, nil
}
type PubsubTopic struct {
Name string
Encoding string
type NamespacedPubsubTopicKind int
const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)
type ShardedPubsubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(ShardedPubsubTopic) bool
}
func (t PubsubTopic) String() string {
return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding)
type NamedShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}
func DefaultPubsubTopic() PubsubTopic {
return NewPubsubTopic("default-waku", "proto")
}
func NewPubsubTopic(name string, encoding string) PubsubTopic {
return PubsubTopic{
Name: name,
Encoding: encoding,
func NewNamedShardingPubsubTopic(name string) ShardedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}
func (t PubsubTopic) Equal(t2 PubsubTopic) bool {
return t.Name == t2.Name && t.Encoding == t2.Encoding
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
func StringToPubsubTopic(s string) (PubsubTopic, error) {
p := strings.Split(s, "/")
if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" {
return PubsubTopic{}, ErrInvalidFormat
func (n NamedShardingPubsubTopic) Name() string {
return n.name
}
func (s NamedShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
}
func (s *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}
return PubsubTopic{
Name: p[3],
Encoding: p[4],
}, nil
topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}
s.kind = NamedSharding
s.name = topicName
return nil
}
type StaticShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
}
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) ShardedPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
}
}
func (n StaticShardingPubsubTopic) Cluster() uint16 {
return n.cluster
}
func (n StaticShardingPubsubTopic) Shard() uint16 {
return n.shard
}
func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
func (s StaticShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}
func (n StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard)
}
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
fmt.Println(topic, StaticShardingPubsubTopicPrefix)
return ErrInvalidShardedTopicPrefix
}
parts := strings.Split(topic[11:], "/")
if len(parts) != 2 {
return ErrInvalidStructure
}
clusterPart := parts[0]
if len(clusterPart) == 0 {
return ErrMissingClusterIndex
}
clusterInt, err := strconv.ParseUint(clusterPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
shardPart := parts[1]
if len(shardPart) == 0 {
return ErrMissingShardNumber
}
shardInt, err := strconv.ParseUint(shardPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding
return nil
}
func ToShardedPubsubTopic(topic string) (ShardedPubsubTopic, error) {
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
} else {
debug.PrintStack()
s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}
}
func DefaultPubsubTopic() ShardedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
}

3
vendor/modules.txt vendored
View File

@ -971,7 +971,7 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.5.2-0.20230302181640-4c385249f567
# github.com/waku-org/go-waku v0.5.2-0.20230308135126-4b52983fc483
## explicit; go 1.19
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence
@ -979,6 +979,7 @@ github.com/waku-org/go-waku/waku/try
github.com/waku-org/go-waku/waku/v2
github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc
github.com/waku-org/go-waku/waku/v2/hash
github.com/waku-org/go-waku/waku/v2/metrics
github.com/waku-org/go-waku/waku/v2/node
github.com/waku-org/go-waku/waku/v2/payload

View File

@ -253,6 +253,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
Version: version,
ContentTopic: req.Topic.ContentTopic(),
Timestamp: api.w.timestamp(),
Meta: []byte{}, // TODO: empty for now. Once we use Waku Archive v2, we should deprecate the timestamp and use an ULID here
Ephemeral: req.Ephemeral,
}

View File

@ -116,7 +116,7 @@ type Waku struct {
bandwidthCounter *metrics.BandwidthCounter
sendQueue chan *pb.WakuMessage
sendQueue chan *protocol.Envelope
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
quit chan struct{} // Channel used for graceful exit
wg sync.WaitGroup
@ -193,7 +193,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *pb.WakuMessage, 1000),
sendQueue: make(chan *protocol.Envelope, 1000),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
quit: make(chan struct{}),
connectionChanged: make(chan struct{}),
@ -1093,26 +1093,20 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
func (w *Waku) broadcast() {
for {
select {
case msg := <-w.sendQueue:
hash, _, err := msg.Hash()
if err != nil {
w.logger.Error("invalid message", zap.Error(err))
continue
}
case envelope := <-w.sendQueue:
var err error
if w.settings.LightClient {
w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(hash)))
_, err = w.node.Lightpush().Publish(context.Background(), msg)
w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.node.Lightpush().Publish(context.Background(), envelope.Message())
} else {
w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(hash)))
_, err = w.node.Relay().Publish(context.Background(), msg)
w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.node.Relay().Publish(context.Background(), envelope.Message())
}
if err != nil {
w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(hash)), zap.Error(err))
w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.Error(err))
w.envelopeFeed.Send(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(hash),
Hash: gethcommon.BytesToHash(envelope.Hash()),
Event: common.EventEnvelopeExpired,
})
@ -1121,7 +1115,7 @@ func (w *Waku) broadcast() {
event := common.EnvelopeEvent{
Event: common.EventEnvelopeSent,
Hash: gethcommon.BytesToHash(hash),
Hash: gethcommon.BytesToHash(envelope.Hash()),
}
w.SendEnvelopeEvent(event)
@ -1135,24 +1129,20 @@ func (w *Waku) broadcast() {
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
hash, _, err := msg.Hash()
if err != nil {
return nil, err
}
envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic) // TODO: once sharding is defined, use the correct pubsub topic
w.sendQueue <- msg
w.sendQueue <- envelope
w.poolMu.Lock()
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)]
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(envelope.Hash())]
w.poolMu.Unlock()
if !alreadyCached {
envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic)
recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType)
w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage)
}
return hash, nil
return envelope.Hash(), nil
}
func (w *Waku) query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (*store.Result, error) {
@ -1289,7 +1279,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage) (bool, error) {
} else {
w.logger.Debug("cached w envelope", zap.String("envelopeHash", recvMessage.Hash().Hex()))
common.EnvelopesCachedCounter.WithLabelValues("miss").Inc()
common.EnvelopesSizeMeter.Observe(float64(recvMessage.Envelope.Size()))
common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload)))
w.postEvent(recvMessage) // notify the local node about the new message
}
return true, nil