diff --git a/go.mod b/go.mod index 26423d57c..069d4458b 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20220303160403-f4f307db8734 + github.com/status-im/go-waku v0.0.0-20220323131654-8468323cd349 github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index be089a26f..0ea464df5 100644 --- a/go.sum +++ b/go.sum @@ -1214,8 +1214,8 @@ github.com/status-im/go-ethereum v1.10.4-status.4 h1:Cs0MoHEhF0LEwii2R8U8jMSEW/g github.com/status-im/go-ethereum v1.10.4-status.4/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20220303160403-f4f307db8734 h1:uyBsGFFH63+SHerRGKRrlWVYI8XFm3PoP4v2ma4RFIs= -github.com/status-im/go-waku v0.0.0-20220303160403-f4f307db8734/go.mod h1:7SH3xYPUpUwiwSkSSuqPoJQo01ulLD5hSFDvGZzjsvo= +github.com/status-im/go-waku v0.0.0-20220323131654-8468323cd349 h1:1aER9CYT/4SDbic0O+nFRhzh84ChTQA2wBYI2fa/pkQ= +github.com/status-im/go-waku v0.0.0-20220323131654-8468323cd349/go.mod h1:brdbk2SAE5B5E7Av4RyxiKXLUbCKrJkIv0l32KtQUa4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/gomoji v1.1.3-0.20220213022530-e5ac4a8732d4 h1:CtobZoiNdHpx+xurFxnuJ1xsGm3oKMfcZkB3vmomJmA= diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go index 0f4de41ce..62fd64f86 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go @@ -285,6 +285,7 @@ func (d *DiscoveryV5) UpdateAddr(addr net.IP) error { return nil } +/* func isWakuNode(node *enode.Node) bool { enrField := new(utils.WakuEnrBitfield) if err := node.Record().Load(enr.WithEntry(utils.WakuENRField, &enrField)); err != nil { @@ -300,6 +301,7 @@ func isWakuNode(node *enode.Node) bool { return false } +*/ func hasTCPPort(node *enode.Node) bool { enrTCP := new(enr.TCP) @@ -318,7 +320,8 @@ func evaluateNode(node *enode.Node) bool { return false } - if !isWakuNode(node) || !hasTCPPort(node) { + // TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage + if /*!isWakuNode(node) ||*/ !hasTCPPort(node) { return false } @@ -362,22 +365,25 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi break } - address, err := utils.EnodeToMultiAddr(iterator.Node()) + addresses, err := utils.Multiaddress(iterator.Node()) if err != nil { d.log.Error(err) continue } - peerInfo, err := peer.AddrInfoFromP2pAddr(address) + peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...) if err != nil { d.log.Error(err) continue } - d.peerCache.recs[peerInfo.ID] = peerRecord{ - expire: time.Now().Unix() + 3600, // Expires in 1hr - peer: *peerInfo, + for _, p := range peerAddrs { + d.peerCache.recs[p.ID] = peerRecord{ + expire: time.Now().Unix() + 3600, // Expires in 1hr + peer: p, + } } + } close(doneCh) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go b/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go index ad6d4d0ed..58e92fd6b 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/waku_payload.go @@ -45,7 +45,6 @@ type KeyInfo struct { SymKey []byte // If the encryption is Symmetric, a Symmetric key must be specified PubKey ecdsa.PublicKey // If the encryption is Asymmetric, the public key of the message receptor must be specified PrivKey *ecdsa.PrivateKey // Set a privkey if the message requires a signature - } // Encode encodes a payload depending on the version parameter. diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index 24e732b3d..0c450af94 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -44,6 +44,8 @@ type Peer struct { Connected bool } +type storeFactory func(w *WakuNode) store.Store + type WakuNode struct { host host.Host opts *WakuNodeParameters @@ -53,7 +55,7 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService - store *store.WakuStore + store store.Store swap *swap.WakuSwap wakuFlag utils.WakuEnrBitfield @@ -79,6 +81,12 @@ type WakuNode struct { // Channel passed to WakuNode constructor // receiving connection status notifications connStatusChan chan ConnStatus + + storeFactory storeFactory +} + +func defaultStoreFactory(w *WakuNode) store.Store { + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) } func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { @@ -136,6 +144,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) + if params.storeFactory != nil { + w.storeFactory = params.storeFactory + } else { + w.storeFactory = defaultStoreFactory + } + if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { return nil, err } @@ -247,7 +261,7 @@ func (w *WakuNode) Start() error { swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), }...) - w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + w.store = w.storeFactory(w) if w.opts.enableStore { w.startStore() } @@ -298,7 +312,7 @@ func (w *WakuNode) Start() error { // Subscribe store to topic if w.opts.storeMsgs { w.log.Info("Subscribing store to broadcaster") - w.bcaster.Register(w.store.MsgC) + w.bcaster.Register(w.store.MessageChannel()) } if w.filter != nil { @@ -360,7 +374,7 @@ func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay } -func (w *WakuNode) Store() *store.WakuStore { +func (w *WakuNode) Store() store.Store { return w.store } @@ -433,6 +447,10 @@ func (w *WakuNode) mountDiscV5() error { discv5.WithAutoUpdate(w.opts.discV5autoUpdate), } + if w.opts.advertiseAddr != nil { + discV5Options = append(discV5Options, discv5.WithAdvertiseAddr(*w.opts.advertiseAddr)) + } + addr := w.ListenAddresses()[0] ipStr, err := addr.ValueForProtocol(ma.P_IP4) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go index 0615c2819..cc12ff6ec 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go @@ -27,7 +27,7 @@ import ( const clientId string = "Go Waku v2 node" // Default minRelayPeersToPublish -const defaultMinRelayPeersToPublish = 1 +const defaultMinRelayPeersToPublish = 0 type WakuNodeParameters struct { hostAddr *net.TCPAddr @@ -74,6 +74,8 @@ type WakuNodeParameters struct { enableLightPush bool connStatusC chan ConnStatus + + storeFactory storeFactory } type WakuNodeOption func(*WakuNodeParameters) error @@ -245,6 +247,14 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +func WithWakuStoreFactory(factory storeFactory) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeFactory = factory + + return nil + } +} + // WithWakuSwap set the option of the Waku V2 Swap protocol func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index 53b0705cb..fd1ca2347 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -152,6 +152,8 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer)) if err != nil { + wf.subscribers.FlagAsFailure(subscriber.peer) + wf.log.Error("failed to connect to peer", err) return err } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/utils.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/utils.go index e1a4afdc3..80f392a02 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/utils.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/utils.go @@ -1,7 +1,8 @@ package pb import ( - gcrypto "github.com/ethereum/go-ethereum/crypto" + "crypto/sha256" + proto "github.com/golang/protobuf/proto" ) @@ -15,7 +16,8 @@ func (msg *WakuMessage) Hash() ([]byte, error) { return Hash(out), nil } -// Hash calculates a hash from a byte slice using keccak256 for the hashing algorithm +// Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm func Hash(data []byte) []byte { - return gcrypto.Keccak256(data) + hash := sha256.Sum256(data) + return hash[:] } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go index b029c2a01..56ccc00e2 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go @@ -8,6 +8,9 @@ import ( "github.com/status-im/go-waku/waku/v2/utils" ) +// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp +const MaxTimeVariance = time.Duration(20) * time.Second + type MessageQueue struct { sync.RWMutex @@ -21,6 +24,8 @@ type MessageQueue struct { } var ErrDuplicatedMessage = errors.New("duplicated message") +var ErrFutureMessage = errors.New("message timestamp in the future") +var ErrTooOld = errors.New("message is too old") func (self *MessageQueue) Push(msg IndexedWakuMessage) error { self.Lock() @@ -33,10 +38,20 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) error { return ErrDuplicatedMessage } + // Ensure that messages don't "jump" to the front of the queue with future timestamps + if msg.index.SenderTime-msg.index.ReceiverTime > int64(MaxTimeVariance) { + return ErrFutureMessage + } + self.seen[k] = struct{}{} self.messages = append(self.messages, msg) if self.maxMessages != 0 && len(self.messages) > self.maxMessages { + + if indexComparison(msg.index, self.messages[0].index) < 0 { + return ErrTooOld // :( + } + numToPop := len(self.messages) - self.maxMessages self.messages = self.messages[numToPop:len(self.messages)] } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index 9d245baa9..9215fed57 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -240,6 +240,15 @@ type WakuStore struct { swap *swap.WakuSwap } +type Store interface { + Start(ctx context.Context) + Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) + Next(ctx context.Context, r *Result) (*Result, error) + Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) + MessageChannel() chan *protocol.Envelope + Stop() +} + // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore { wakuStore := new(WakuStore) @@ -775,6 +784,10 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return msgCount, nil } +func (store *WakuStore) MessageChannel() chan *protocol.Envelope { + return store.MsgC +} + // TODO: queryWithAccounting // Stop closes the store message channel and removes the protocol stream handler diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go index 2ff67ed1d..c8fa4a6a1 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go @@ -144,8 +144,13 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { var multiaddrRaw []byte if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil { - if !enr.IsNotFound(err) { - Logger().Error("could not retrieve multiaddress field for node ", zap.Any("enode", node)) + if enr.IsNotFound(err) { + Logger().Debug("Trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node)) + addr, err := EnodeToMultiAddr(node) + if err != nil { + return nil, err + } + return []ma.Multiaddr{addr}, nil } return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index dfb96d3f5..0330b9374 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -455,7 +455,7 @@ github.com/status-im/go-discover/discover/v4wire github.com/status-im/go-discover/discover/v5wire # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20220303160403-f4f307db8734 +# github.com/status-im/go-waku v0.0.0-20220323131654-8468323cd349 github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/try github.com/status-im/go-waku/waku/v2