From d8a49c538b744fbde10acff5b395cef91e24cd78 Mon Sep 17 00:00:00 2001 From: kaichao Date: Thu, 27 Jun 2024 09:54:31 +0800 Subject: [PATCH] Reset MVDS epoch after peer is online (#5349) * feat_: reset epoch for online peer * chore_: fix * chore_: refactor * chore_: update mvds * chore_: fix lint * chore_: update mvds * chore_: make vendor * chore_: fix tst * chore_: tuning store query hash parameter * chore_: non-blocking mvds status change channel --- go.mod | 2 +- go.sum | 4 +- protocol/common/message_sender.go | 3 +- protocol/messenger.go | 7 +- protocol/messenger_mailserver.go | 2 +- protocol/messenger_status_updates.go | 19 ++++ .../mvds/node/migrations/migrations.go | 40 ++++----- vendor/github.com/status-im/mvds/node/node.go | 66 +++++++++++--- .../mvds/peers/migrations/migrations.go | 40 ++++----- .../status-im/mvds/protobuf/sync.pb.go | 4 +- .../mvds/state/migrations/migrations.go | 88 ++++++++++++++----- .../github.com/status-im/mvds/state/state.go | 1 + .../status-im/mvds/state/state_memory.go | 13 +++ .../status-im/mvds/state/state_sqlite.go | 80 +++++++++++++++++ .../mvds/store/migrations/migrations.go | 40 ++++----- vendor/modules.txt | 2 +- wakuv2/waku.go | 19 ++-- 17 files changed, 319 insertions(+), 111 deletions(-) diff --git a/go.mod b/go.mod index fc13994c5..ed217542a 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/status-im/doubleratchet v3.0.0+incompatible github.com/status-im/markdown v0.0.0-20240404192634-b7e33c6ac3d4 github.com/status-im/migrate/v4 v4.6.2-status.3 - github.com/status-im/mvds v0.0.27-0.20240528050259-27702bea5ab7 + github.com/status-im/mvds v0.0.27-0.20240624014816-2dd6758177e5 github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e github.com/status-im/status-go/extkeys v1.1.2 github.com/status-im/tcp-shaker v1.1.1-status diff --git a/go.sum b/go.sum index 12b76b77e..48a06dd0b 100644 --- a/go.sum +++ b/go.sum @@ -2034,8 +2034,8 @@ github.com/status-im/markdown v0.0.0-20240404192634-b7e33c6ac3d4 h1:KBeXtOoisXji github.com/status-im/markdown v0.0.0-20240404192634-b7e33c6ac3d4/go.mod h1:5rjPyv3KffPNVbFjnsVy0NGj9+JeW40WvXLdxH1VKuE= github.com/status-im/migrate/v4 v4.6.2-status.3 h1:Khwjb59NzniloUr5i9s9AtkEyqBbQFt1lkoAu66sAu0= github.com/status-im/migrate/v4 v4.6.2-status.3/go.mod h1:c/kc90n47GZu/58nnz1OMLTf7uE4Da4gZP5qmU+A/v8= -github.com/status-im/mvds v0.0.27-0.20240528050259-27702bea5ab7 h1:oJvhoGTaGmLFTlp8KuoWLzUvPupite5SHrKI6Y/1Ddk= -github.com/status-im/mvds v0.0.27-0.20240528050259-27702bea5ab7/go.mod h1:2fiAx0q9XYIPKYRq2B1oiO9zZESy/n4D32gWw6lMDsE= +github.com/status-im/mvds v0.0.27-0.20240624014816-2dd6758177e5 h1:+yVCBKrEwMRsZupZJgzlmI52Sa/8KSzUD5cPAkDe7+Y= +github.com/status-im/mvds v0.0.27-0.20240624014816-2dd6758177e5/go.mod h1:2fiAx0q9XYIPKYRq2B1oiO9zZESy/n4D32gWw6lMDsE= github.com/status-im/notify v1.0.2-status h1:x8wev0Sh8H8KAf4bVcv+L0dVHldBESOKUlqRqRY7uL8= github.com/status-im/notify v1.0.2-status/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc= github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e h1:pCOHeAYmYttXQBCn+6u01bs5d/W3XslxmplFhru4X1Y= diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 219d6c6f4..5398c1a66 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -126,7 +126,7 @@ func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secr s.handleSharedSecrets = handler } -func (s *MessageSender) StartDatasync(handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error { +func (s *MessageSender) StartDatasync(statusChangeEvent chan datasyncnode.PeerStatusChangeEvent, handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error { if !s.datasyncEnabled { return nil } @@ -138,6 +138,7 @@ func (s *MessageSender) StartDatasync(handler func(peer state.PeerID, payload *d datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey), datasyncnode.BATCH, datasync.CalculateSendTime, + statusChangeEvent, s.logger, ) if err != nil { diff --git a/protocol/messenger.go b/protocol/messenger.go index c3efc08c6..40e1467f0 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -21,6 +21,8 @@ import ( "go.uber.org/zap" "golang.org/x/time/rate" + datasyncnode "github.com/status-im/mvds/node" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/event" @@ -191,6 +193,8 @@ type Messenger struct { peersyncing *peersyncing.PeerSyncing peersyncingOffers map[string]uint64 peersyncingRequests map[string]uint64 + + mvdsStatusChangeEvent chan datasyncnode.PeerStatusChangeEvent } type connStatus int @@ -570,6 +574,7 @@ func NewMessenger( peersyncingOffers: make(map[string]uint64), peersyncingRequests: make(map[string]uint64), peerStore: peerStore, + mvdsStatusChangeEvent: make(chan datasyncnode.PeerStatusChangeEvent, 5), verificationDatabase: verification.NewPersistence(database), mailserverCycle: mailserverCycle{ peers: make(map[string]peerStatus), @@ -775,7 +780,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { // set shared secret handles m.sender.SetHandleSharedSecrets(m.handleSharedSecrets) - if err := m.sender.StartDatasync(m.sendDataSync); err != nil { + if err := m.sender.StartDatasync(m.mvdsStatusChangeEvent, m.sendDataSync); err != nil { return nil, err } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index ce370933b..4290790b1 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -1075,7 +1075,7 @@ func (m *Messenger) ConnectionChanged(state connection.State) { } if m.connectionState.Offline && !state.Offline { - err := m.sender.StartDatasync(m.sendDataSync) + err := m.sender.StartDatasync(m.mvdsStatusChangeEvent, m.sendDataSync) if err != nil { m.logger.Error("failed to start datasync", zap.Error(err)) } diff --git a/protocol/messenger_status_updates.go b/protocol/messenger_status_updates.go index 6188a294d..35ebfe6dc 100644 --- a/protocol/messenger_status_updates.go +++ b/protocol/messenger_status_updates.go @@ -8,6 +8,10 @@ import ( "github.com/golang/protobuf/proto" "go.uber.org/zap" + datasyncnode "github.com/status-im/mvds/node" + + datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer" + "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/communities" @@ -264,6 +268,21 @@ func (m *Messenger) HandleStatusUpdate(state *ReceivedMessageState, message *pro return err } state.Response.AddStatusUpdate(statusUpdate) + if statusUpdate.StatusType == int(protobuf.StatusUpdate_AUTOMATIC) || + statusUpdate.StatusType == int(protobuf.StatusUpdate_ALWAYS_ONLINE) || + statusUpdate.StatusType == int(protobuf.StatusUpdate_INACTIVE) { + m.logger.Debug("reset data sync for peer", zap.String("public_key", statusUpdate.PublicKey), zap.Uint64("clock", statusUpdate.Clock)) + select { + case m.mvdsStatusChangeEvent <- datasyncnode.PeerStatusChangeEvent{ + PeerID: datasyncpeer.PublicKeyToPeerID(*state.CurrentMessageState.PublicKey), + Status: datasyncnode.OnlineStatus, + EventTime: statusUpdate.Clock, + }: + default: + m.logger.Debug("mvdsStatusChangeEvent channel is full") + } + + } } return nil diff --git a/vendor/github.com/status-im/mvds/node/migrations/migrations.go b/vendor/github.com/status-im/mvds/node/migrations/migrations.go index 36130b234..e66bb21a4 100644 --- a/vendor/github.com/status-im/mvds/node/migrations/migrations.go +++ b/vendor/github.com/status-im/mvds/node/migrations/migrations.go @@ -12,6 +12,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -21,7 +22,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } var buf bytes.Buffer @@ -29,7 +30,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } if clErr != nil { return nil, err @@ -85,7 +86,7 @@ func _1565345162_initial_schemaDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565345162_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565345162_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7c, 0x69, 0xd2, 0x3, 0xea, 0x82, 0x7c, 0xb3, 0x44, 0x6c, 0xef, 0x64, 0x2c, 0x99, 0x62, 0xa2, 0x8b, 0x6f, 0x96, 0x4f, 0x34, 0x41, 0x87, 0xd5, 0x4e, 0x3, 0x7f, 0x4a, 0xd1, 0x91, 0x9, 0x99}} return a, nil } @@ -105,7 +106,7 @@ func _1565345162_initial_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565345162_initial_schema.up.sql", size: 86, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565345162_initial_schema.up.sql", size: 86, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x78, 0x7c, 0xdd, 0x67, 0x61, 0x3e, 0x7f, 0xd4, 0xce, 0xb0, 0x17, 0xbe, 0x5a, 0xa7, 0x9e, 0x93, 0x34, 0xe8, 0xbb, 0x44, 0xfb, 0x88, 0xd6, 0x18, 0x6d, 0x9f, 0xb4, 0x22, 0xda, 0xbc, 0x87, 0x94}} return a, nil } @@ -125,7 +126,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}} return a, nil } @@ -222,24 +223,21 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ "1565345162_initial_schema.down.sql": _1565345162_initial_schemaDownSql, - "1565345162_initial_schema.up.sql": _1565345162_initial_schemaUpSql, - "doc.go": docGo, -} -// AssetDebug is true if the assets were built with the debug flag enabled. -const AssetDebug = false + "1565345162_initial_schema.up.sql": _1565345162_initial_schemaUpSql, + + "doc.go": docGo, +} // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// -// data/ -// foo.txt -// img/ -// a.png -// b.png -// +// data/ +// foo.txt +// img/ +// a.png +// b.png // then AssetDir("data") would return []string{"foo.txt", "img"}, // AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and @@ -272,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1565345162_initial_schema.down.sql": {_1565345162_initial_schemaDownSql, map[string]*bintree{}}, - "1565345162_initial_schema.up.sql": {_1565345162_initial_schemaUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, + "1565345162_initial_schema.down.sql": &bintree{_1565345162_initial_schemaDownSql, map[string]*bintree{}}, + "1565345162_initial_schema.up.sql": &bintree{_1565345162_initial_schemaUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. @@ -291,7 +289,7 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = os.WriteFile(_filePath(dir, name), data, info.Mode()) + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) if err != nil { return err } diff --git a/vendor/github.com/status-im/mvds/node/node.go b/vendor/github.com/status-im/mvds/node/node.go index b148e399a..d89f05588 100644 --- a/vendor/github.com/status-im/mvds/node/node.go +++ b/vendor/github.com/status-im/mvds/node/node.go @@ -8,6 +8,7 @@ import ( "database/sql" "encoding/hex" "fmt" + "math/rand" "sync/atomic" "time" @@ -31,6 +32,21 @@ const ( // CalculateNextEpoch is a function used to calculate the next `SendEpoch` for a given message. type CalculateNextEpoch func(count uint64, epoch int64) int64 +type EventStatus int + +const ( + OnlineStatus EventStatus = iota + OfflineStatus +) + +const FreshEventPeriod = 10 // seconds + +type PeerStatusChangeEvent struct { + PeerID state.PeerID + Status EventStatus + EventTime uint64 +} + // Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages. type Node struct { // This needs to be declared first: https://github.com/golang/go/issues/9959 @@ -56,6 +72,8 @@ type Node struct { subscription chan protobuf.Message + peerStatusChangeEvent chan PeerStatusChangeEvent + logger *zap.Logger } @@ -65,6 +83,7 @@ func NewPersistentNode( id state.PeerID, mode Mode, nextEpoch CalculateNextEpoch, + peerStatusChangeEvent chan PeerStatusChangeEvent, logger *zap.Logger, ) (*Node, error) { ctx, cancel := context.WithCancel(context.Background()) @@ -73,18 +92,19 @@ func NewPersistentNode( } node := Node{ - ID: id, - ctx: ctx, - cancel: cancel, - store: store.NewPersistentMessageStore(db), - transport: st, - peers: peers.NewSQLitePersistence(db), - syncState: state.NewPersistentSyncState(db), - payloads: newPayloads(), - epochPersistence: newEpochSQLitePersistence(db), - nextEpoch: nextEpoch, - logger: logger.With(zap.Namespace("mvds")), - mode: mode, + ID: id, + ctx: ctx, + cancel: cancel, + store: store.NewPersistentMessageStore(db), + transport: st, + peers: peers.NewSQLitePersistence(db), + syncState: state.NewPersistentSyncState(db), + payloads: newPayloads(), + epochPersistence: newEpochSQLitePersistence(db), + nextEpoch: nextEpoch, + peerStatusChangeEvent: peerStatusChangeEvent, + logger: logger.With(zap.Namespace("mvds")), + mode: mode, } if currentEpoch, err := node.epochPersistence.Get(id); err != nil { return nil, err @@ -175,6 +195,21 @@ func (n *Node) Start(duration time.Duration) { } }() + go func() { + for { + select { + case <-n.ctx.Done(): + n.logger.Info("reset data sync for peer stopped") + return + case event := <-n.peerStatusChangeEvent: + if event.Status == OnlineStatus && event.EventTime > uint64(time.Now().Unix())-FreshEventPeriod { + n.logger.Debug("resetting peer epoch", zap.String("peerID", hex.EncodeToString(event.PeerID[:4]))) + n.resetPeerEpoch(event.PeerID) + } + } + } + }() + go func() { for { select { @@ -549,6 +584,13 @@ func (n *Node) updateSendEpoch(s state.State) state.State { return s } +func (n *Node) resetPeerEpoch(peerID state.PeerID) { + n.syncState.MapWithPeerId(peerID, func(s state.State) state.State { + s.SendEpoch = n.epoch + int64(rand.Intn(60)) + return s + }) +} + func toMessageID(b []byte) state.MessageID { var id state.MessageID copy(id[:], b) diff --git a/vendor/github.com/status-im/mvds/peers/migrations/migrations.go b/vendor/github.com/status-im/mvds/peers/migrations/migrations.go index df6aacb06..5363d2e55 100644 --- a/vendor/github.com/status-im/mvds/peers/migrations/migrations.go +++ b/vendor/github.com/status-im/mvds/peers/migrations/migrations.go @@ -12,6 +12,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -21,7 +22,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } var buf bytes.Buffer @@ -29,7 +30,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } if clErr != nil { return nil, err @@ -85,7 +86,7 @@ func _1565249278_initial_schemaDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565249278_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565249278_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4, 0xfb, 0x5, 0x92, 0xf0, 0x93, 0xaa, 0x83, 0xb7, 0xdf, 0x66, 0xe2, 0x97, 0x53, 0x9d, 0x34, 0xd3, 0xca, 0x97, 0xd8, 0xe1, 0xed, 0xf0, 0x4a, 0x94, 0x1a, 0xb1, 0x8f, 0xcf, 0xc, 0xa4, 0x6}} return a, nil } @@ -105,7 +106,7 @@ func _1565249278_initial_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565249278_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565249278_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xbc, 0x3a, 0x87, 0x12, 0x93, 0xeb, 0xb4, 0xcc, 0x42, 0x6e, 0xb2, 0x7d, 0xfa, 0x9a, 0xa8, 0x3f, 0xb, 0x6b, 0xa8, 0x2d, 0x8b, 0xde, 0x67, 0x2a, 0xa8, 0xa5, 0x42, 0xad, 0x27, 0x15, 0x7e}} return a, nil } @@ -125,7 +126,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}} return a, nil } @@ -222,24 +223,21 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ "1565249278_initial_schema.down.sql": _1565249278_initial_schemaDownSql, - "1565249278_initial_schema.up.sql": _1565249278_initial_schemaUpSql, - "doc.go": docGo, -} -// AssetDebug is true if the assets were built with the debug flag enabled. -const AssetDebug = false + "1565249278_initial_schema.up.sql": _1565249278_initial_schemaUpSql, + + "doc.go": docGo, +} // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// -// data/ -// foo.txt -// img/ -// a.png -// b.png -// +// data/ +// foo.txt +// img/ +// a.png +// b.png // then AssetDir("data") would return []string{"foo.txt", "img"}, // AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and @@ -272,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1565249278_initial_schema.down.sql": {_1565249278_initial_schemaDownSql, map[string]*bintree{}}, - "1565249278_initial_schema.up.sql": {_1565249278_initial_schemaUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, + "1565249278_initial_schema.down.sql": &bintree{_1565249278_initial_schemaDownSql, map[string]*bintree{}}, + "1565249278_initial_schema.up.sql": &bintree{_1565249278_initial_schemaUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. @@ -291,7 +289,7 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = os.WriteFile(_filePath(dir, name), data, info.Mode()) + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) if err != nil { return err } diff --git a/vendor/github.com/status-im/mvds/protobuf/sync.pb.go b/vendor/github.com/status-im/mvds/protobuf/sync.pb.go index 9d9e9cbeb..263c78a04 100644 --- a/vendor/github.com/status-im/mvds/protobuf/sync.pb.go +++ b/vendor/github.com/status-im/mvds/protobuf/sync.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.29.1 -// protoc v3.21.12 +// protoc-gen-go v1.32.0 +// protoc v4.25.0 // source: sync.proto package protobuf diff --git a/vendor/github.com/status-im/mvds/state/migrations/migrations.go b/vendor/github.com/status-im/mvds/state/migrations/migrations.go index 4f9db263b..98368ecfe 100644 --- a/vendor/github.com/status-im/mvds/state/migrations/migrations.go +++ b/vendor/github.com/status-im/mvds/state/migrations/migrations.go @@ -2,6 +2,8 @@ // sources: // 1565341329_initial_schema.down.sql (24B) // 1565341329_initial_schema.up.sql (294B) +// 1718939515_index_peer_id.down.sql (46B) +// 1718939515_index_peer_id.up.sql (76B) // doc.go (377B) package migrations @@ -12,6 +14,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -21,7 +24,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } var buf bytes.Buffer @@ -29,7 +32,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } if clErr != nil { return nil, err @@ -85,7 +88,7 @@ func _1565341329_initial_schemaDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565341329_initial_schema.down.sql", size: 24, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565341329_initial_schema.down.sql", size: 24, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x20, 0x56, 0x1a, 0x0, 0xc5, 0x81, 0xb3, 0xeb, 0x2a, 0xae, 0xed, 0xbb, 0x68, 0x51, 0x68, 0xc7, 0xe3, 0x31, 0xe, 0x1, 0x3e, 0xd2, 0x85, 0x9e, 0x6d, 0x55, 0xad, 0x55, 0xd6, 0x2f, 0x29, 0xca}} return a, nil } @@ -105,11 +108,51 @@ func _1565341329_initial_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565341329_initial_schema.up.sql", size: 294, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565341329_initial_schema.up.sql", size: 294, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3e, 0xa5, 0x37, 0x9d, 0x3f, 0xf3, 0xc9, 0xc8, 0x12, 0x74, 0x79, 0x74, 0xff, 0xfd, 0xb1, 0x5f, 0x13, 0xaf, 0xf2, 0x50, 0x14, 0x9f, 0xdf, 0xc8, 0xc5, 0xa7, 0xc3, 0xf5, 0xa4, 0x8e, 0x8a, 0xf6}} return a, nil } +var __1718939515_index_peer_idDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\x88\xcf\x2d\x4b\x29\x8e\x2f\x2e\x49\x2c\x49\x2d\x8e\x2f\x48\x4d\x2d\x8a\xcf\x4c\xb1\xe6\x02\x04\x00\x00\xff\xff\x7e\x04\x3e\x34\x2e\x00\x00\x00") + +func _1718939515_index_peer_idDownSqlBytes() ([]byte, error) { + return bindataRead( + __1718939515_index_peer_idDownSql, + "1718939515_index_peer_id.down.sql", + ) +} + +func _1718939515_index_peer_idDownSql() (*asset, error) { + bytes, err := _1718939515_index_peer_idDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1718939515_index_peer_id.down.sql", size: 46, mode: os.FileMode(0644), modTime: time.Unix(1718939687, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd3, 0x82, 0xd8, 0x4a, 0xc6, 0xf3, 0xbc, 0x7d, 0x4d, 0x81, 0x96, 0x91, 0x8f, 0x4d, 0x84, 0x1c, 0x26, 0xdb, 0x59, 0xb3, 0x4f, 0x14, 0xfb, 0x65, 0x8d, 0x58, 0xc4, 0xe2, 0x23, 0x20, 0x19, 0x16}} + return a, nil +} + +var __1718939515_index_peer_idUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\xf0\xf3\x0f\x51\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\x88\xcf\x2d\x4b\x29\x8e\x2f\x2e\x49\x2c\x49\x2d\x8e\x2f\x48\x4d\x2d\x8a\xcf\x4c\x51\xf0\xf7\x53\x40\x12\xd6\x80\x0a\x6b\x5a\x73\x01\x02\x00\x00\xff\xff\x74\xa6\x2f\x6d\x4c\x00\x00\x00") + +func _1718939515_index_peer_idUpSqlBytes() ([]byte, error) { + return bindataRead( + __1718939515_index_peer_idUpSql, + "1718939515_index_peer_id.up.sql", + ) +} + +func _1718939515_index_peer_idUpSql() (*asset, error) { + bytes, err := _1718939515_index_peer_idUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1718939515_index_peer_id.up.sql", size: 76, mode: os.FileMode(0644), modTime: time.Unix(1718939763, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x54, 0x95, 0xa4, 0xe3, 0x3d, 0x8a, 0xa2, 0x7a, 0x89, 0x6, 0xc6, 0x47, 0x73, 0x4, 0xe6, 0x7c, 0xda, 0x50, 0xa2, 0x97, 0x1f, 0x1f, 0x31, 0x45, 0xd8, 0x8b, 0x16, 0x5e, 0xc3, 0xdd, 0x5b, 0x5a}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\xbb\x6e\xc3\x30\x0c\x45\x77\x7f\xc5\x45\x96\x2c\xb5\xb4\x74\xea\xd6\xb1\x7b\x7f\x80\x91\x68\x89\x88\x1e\xae\x48\xe7\xf1\xf7\x85\xd3\x02\xcd\xd6\xf5\x00\xe7\xf0\xd2\x7b\x7c\x66\x51\x2c\x52\x18\xa2\x68\x1c\x58\x95\xc6\x1d\x27\x0e\xb4\x29\xe3\x90\xc4\xf2\x76\x72\xa1\x57\xaf\x46\xb6\xe9\x2c\xd5\x57\x49\x83\x8c\xfd\xe5\xf5\x30\x79\x8f\x40\xed\x68\xc8\xd4\x62\xe1\x47\x4b\xa1\x46\xc3\xa4\x25\x5c\xc5\x32\x08\xeb\xe0\x45\x6e\x0e\xef\x86\xc2\xa4\x06\xcb\x64\x47\x85\x65\x46\x20\xe5\x3d\xb3\xf4\x81\xd4\xe7\x93\xb4\x48\x46\x6e\x47\x1f\xcb\x13\xd9\x17\x06\x2a\x85\x23\x96\xd1\xeb\xc3\x55\xaa\x8c\x28\x83\x83\xf5\x71\x7f\x01\xa9\xb2\xa1\x51\x65\xdd\xfd\x4c\x17\x46\xeb\xbf\xe7\x41\x2d\xfe\xff\x11\xae\x7d\x9c\x15\xa4\xe0\xdb\xca\xc1\x38\xba\x69\x5a\x29\x9c\x29\x31\xf4\xab\x88\xf1\x34\x79\x9f\xfa\x5b\xe2\xc6\xbb\xf5\xbc\x71\x5e\xcf\x09\x3f\x35\xe9\x4d\x31\x77\x38\xe7\xff\x80\x4b\x1d\x6e\xfa\x0e\x00\x00\xff\xff\x9d\x60\x3d\x88\x79\x01\x00\x00") func docGoBytes() ([]byte, error) { @@ -125,7 +168,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}} return a, nil } @@ -222,24 +265,25 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ "1565341329_initial_schema.down.sql": _1565341329_initial_schemaDownSql, - "1565341329_initial_schema.up.sql": _1565341329_initial_schemaUpSql, - "doc.go": docGo, -} -// AssetDebug is true if the assets were built with the debug flag enabled. -const AssetDebug = false + "1565341329_initial_schema.up.sql": _1565341329_initial_schemaUpSql, + + "1718939515_index_peer_id.down.sql": _1718939515_index_peer_idDownSql, + + "1718939515_index_peer_id.up.sql": _1718939515_index_peer_idUpSql, + + "doc.go": docGo, +} // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// -// data/ -// foo.txt -// img/ -// a.png -// b.png -// +// data/ +// foo.txt +// img/ +// a.png +// b.png // then AssetDir("data") would return []string{"foo.txt", "img"}, // AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and @@ -272,9 +316,11 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1565341329_initial_schema.down.sql": {_1565341329_initial_schemaDownSql, map[string]*bintree{}}, - "1565341329_initial_schema.up.sql": {_1565341329_initial_schemaUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, + "1565341329_initial_schema.down.sql": &bintree{_1565341329_initial_schemaDownSql, map[string]*bintree{}}, + "1565341329_initial_schema.up.sql": &bintree{_1565341329_initial_schemaUpSql, map[string]*bintree{}}, + "1718939515_index_peer_id.down.sql": &bintree{_1718939515_index_peer_idDownSql, map[string]*bintree{}}, + "1718939515_index_peer_id.up.sql": &bintree{_1718939515_index_peer_idUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. @@ -291,7 +337,7 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = os.WriteFile(_filePath(dir, name), data, info.Mode()) + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) if err != nil { return err } diff --git a/vendor/github.com/status-im/mvds/state/state.go b/vendor/github.com/status-im/mvds/state/state.go index 977fbe282..6da0371f6 100644 --- a/vendor/github.com/status-im/mvds/state/state.go +++ b/vendor/github.com/status-im/mvds/state/state.go @@ -26,4 +26,5 @@ type SyncState interface { Remove(id MessageID, peer PeerID) error All(epoch int64) ([]State, error) Map(epoch int64, process func(State) State) error + MapWithPeerId(peerID PeerID, process func(State) State) error } diff --git a/vendor/github.com/status-im/mvds/state/state_memory.go b/vendor/github.com/status-im/mvds/state/state_memory.go index 7514b6089..fa8609b7e 100644 --- a/vendor/github.com/status-im/mvds/state/state_memory.go +++ b/vendor/github.com/status-im/mvds/state/state_memory.go @@ -59,3 +59,16 @@ func (s *memorySyncState) Map(epoch int64, process func(State) State) error { return nil } + +func (s *memorySyncState) MapWithPeerId(peerID PeerID, process func(State) State) error { + s.Lock() + defer s.Unlock() + + for i, state := range s.state { + if state.PeerID == peerID { + s.state[i] = process(state) + } + } + + return nil +} diff --git a/vendor/github.com/status-im/mvds/state/state_sqlite.go b/vendor/github.com/status-im/mvds/state/state_sqlite.go index 1d29a716f..9b4b59215 100644 --- a/vendor/github.com/status-im/mvds/state/state_sqlite.go +++ b/vendor/github.com/status-im/mvds/state/state_sqlite.go @@ -10,6 +10,8 @@ var ( ErrStateNotFound = errors.New("state not found") ) +const BatchResetEpochMessages = 100 + // Verify that SyncState interface is implemented. var _ SyncState = (*sqliteSyncState)(nil) @@ -104,6 +106,52 @@ func (p *sqliteSyncState) All(epoch int64) ([]State, error) { return result, nil } +func (p *sqliteSyncState) QueryByPeerID(peerID PeerID, limit int) ([]State, error) { + var result []State + + rows, err := p.db.Query(` + SELECT + type, send_count, send_epoch, group_id, peer_id, message_id + FROM + mvds_states + WHERE + peer_id = ? + LIMIT ? + `, peerID[:], limit) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var ( + state State + groupID, peerID, messageID []byte + ) + err := rows.Scan( + &state.Type, + &state.SendCount, + &state.SendEpoch, + &groupID, + &peerID, + &messageID, + ) + if err != nil { + return nil, err + } + if len(groupID) > 0 { + val := GroupID{} + copy(val[:], groupID) + state.GroupID = &val + } + copy(state.PeerID[:], peerID) + copy(state.MessageID[:], messageID) + + result = append(result, state) + } + + return result, nil +} + func (p *sqliteSyncState) Map(epoch int64, process func(State) State) error { states, err := p.All(epoch) if err != nil { @@ -140,6 +188,38 @@ func (p *sqliteSyncState) Map(epoch int64, process func(State) State) error { return tx.Commit() } +func (p *sqliteSyncState) MapWithPeerId(peerID PeerID, process func(State) State) error { + states, err := p.QueryByPeerID(peerID, BatchResetEpochMessages) + if err != nil { + return err + } + + var updated []State + + for _, state := range states { + newState := process(state) + if newState != state { + updated = append(updated, newState) + } + } + + if len(updated) == 0 { + return nil + } + + tx, err := p.db.Begin() + if err != nil { + return err + } + for _, state := range updated { + if err := updateInTx(tx, state); err != nil { + _ = tx.Rollback() + return err + } + } + return tx.Commit() +} + func updateInTx(tx *sql.Tx, state State) error { _, err := tx.Exec(` UPDATE mvds_states diff --git a/vendor/github.com/status-im/mvds/store/migrations/migrations.go b/vendor/github.com/status-im/mvds/store/migrations/migrations.go index f0416072d..924b2249d 100644 --- a/vendor/github.com/status-im/mvds/store/migrations/migrations.go +++ b/vendor/github.com/status-im/mvds/store/migrations/migrations.go @@ -12,6 +12,7 @@ import ( "crypto/sha256" "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -21,7 +22,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } var buf bytes.Buffer @@ -29,7 +30,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("read %q: %w", name, err) + return nil, fmt.Errorf("read %q: %v", name, err) } if clErr != nil { return nil, err @@ -85,7 +86,7 @@ func _1565447861_initial_schemaDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565447861_initial_schema.down.sql", size: 28, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565447861_initial_schema.down.sql", size: 28, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x92, 0x55, 0x8d, 0x3, 0x68, 0x1a, 0x9c, 0xd7, 0xc7, 0xb4, 0x5a, 0xb1, 0x27, 0x47, 0xf4, 0xc6, 0x8d, 0x85, 0xbb, 0xae, 0xb6, 0x69, 0xc5, 0xbc, 0x21, 0xba, 0xc0, 0xc6, 0x2a, 0xc8, 0xb2, 0xf7}} return a, nil } @@ -105,7 +106,7 @@ func _1565447861_initial_schemaUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1565447861_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "1565447861_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x21, 0x13, 0xbf, 0x64, 0x18, 0xf7, 0xe2, 0xd8, 0xb5, 0x7d, 0x8, 0xf1, 0x66, 0xb9, 0xb3, 0x49, 0x68, 0xe2, 0xa2, 0xea, 0x90, 0x11, 0x70, 0x9c, 0x15, 0x28, 0x3f, 0x3f, 0x90, 0x3c, 0x76, 0xf}} return a, nil } @@ -125,7 +126,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1704726726, 0)} + info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1706497287, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}} return a, nil } @@ -222,24 +223,21 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ "1565447861_initial_schema.down.sql": _1565447861_initial_schemaDownSql, - "1565447861_initial_schema.up.sql": _1565447861_initial_schemaUpSql, - "doc.go": docGo, -} -// AssetDebug is true if the assets were built with the debug flag enabled. -const AssetDebug = false + "1565447861_initial_schema.up.sql": _1565447861_initial_schemaUpSql, + + "doc.go": docGo, +} // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// -// data/ -// foo.txt -// img/ -// a.png -// b.png -// +// data/ +// foo.txt +// img/ +// a.png +// b.png // then AssetDir("data") would return []string{"foo.txt", "img"}, // AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and @@ -272,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1565447861_initial_schema.down.sql": {_1565447861_initial_schemaDownSql, map[string]*bintree{}}, - "1565447861_initial_schema.up.sql": {_1565447861_initial_schemaUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, + "1565447861_initial_schema.down.sql": &bintree{_1565447861_initial_schemaDownSql, map[string]*bintree{}}, + "1565447861_initial_schema.up.sql": &bintree{_1565447861_initial_schemaUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. @@ -291,7 +289,7 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = os.WriteFile(_filePath(dir, name), data, info.Mode()) + err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) if err != nil { return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 1b0a4f832..4e5a3b3ee 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -931,7 +931,7 @@ github.com/status-im/migrate/v4/database/postgres github.com/status-im/migrate/v4/database/sqlcipher github.com/status-im/migrate/v4/internal/url github.com/status-im/migrate/v4/source/go_bindata -# github.com/status-im/mvds v0.0.27-0.20240528050259-27702bea5ab7 +# github.com/status-im/mvds v0.0.27-0.20240624014816-2dd6758177e5 ## explicit; go 1.19 github.com/status-im/mvds/node github.com/status-im/mvds/node/migrations diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 37e9ff03c..b78a1fd10 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -85,8 +85,9 @@ const bootnodesQueryBackoffMs = 200 const bootnodesMaxRetries = 7 const cacheTTL = 20 * time.Minute const maxHashQueryLength = 100 -const hashQueryInterval = 5 * time.Second -const messageSentPeriod = 5 // in seconds +const hashQueryInterval = 3 * time.Second +const messageSentPeriod = 3 // in seconds +const messageExpiredPerid = 10 // in seconds type SentEnvelope struct { Envelope *v2protocol.Envelope @@ -1035,8 +1036,10 @@ func (w *Waku) checkIfMessagesStored() { w.logger.Debug("running loop for messages stored check", zap.Any("messageIds", w.sendMsgIDs)) pubsubTopics := make([]string, 0, len(w.sendMsgIDs)) pubsubMessageIds := make([][]gethcommon.Hash, 0, len(w.sendMsgIDs)) + pubsubMessageTime := make([][]uint32, 0, len(w.sendMsgIDs)) for pubsubTopic, subMsgs := range w.sendMsgIDs { var queryMsgIds []gethcommon.Hash + var queryMsgTime []uint32 for msgID, sendTime := range subMsgs { if len(queryMsgIds) >= maxHashQueryLength { break @@ -1044,19 +1047,21 @@ func (w *Waku) checkIfMessagesStored() { // message is sent 5 seconds ago, check if it's stored if uint32(w.timesource.Now().Unix()) > sendTime+messageSentPeriod { queryMsgIds = append(queryMsgIds, msgID) + queryMsgTime = append(queryMsgTime, sendTime) } } w.logger.Debug("store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic)) if len(queryMsgIds) > 0 { pubsubTopics = append(pubsubTopics, pubsubTopic) pubsubMessageIds = append(pubsubMessageIds, queryMsgIds) + pubsubMessageTime = append(pubsubMessageTime, queryMsgTime) } } w.sendMsgIDsMu.Unlock() pubsubProcessedMessages := make([][]gethcommon.Hash, len(pubsubTopics)) for i, pubsubTopic := range pubsubTopics { - processedMessages := w.messageHashBasedQuery(w.ctx, pubsubMessageIds[i], pubsubTopic) + processedMessages := w.messageHashBasedQuery(w.ctx, pubsubMessageIds[i], pubsubMessageTime[i], pubsubTopic) pubsubProcessedMessages[i] = processedMessages } @@ -1150,7 +1155,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { } // ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes -func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, pubsubTopic string) []gethcommon.Hash { +func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, relayTime []uint32, pubsubTopic string) []gethcommon.Hash { selectedPeer := w.storePeerID if selectedPeer == "" { w.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) @@ -1181,7 +1186,7 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha var ackHashes []gethcommon.Hash var missedHashes []gethcommon.Hash - for _, hash := range hashes { + for i, hash := range hashes { found := false for _, msg := range result.Messages() { if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) { @@ -1196,7 +1201,9 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha Hash: hash, Event: common.EventEnvelopeSent, }) - } else { + } + + if !found && uint32(w.timesource.Now().Unix()) > relayTime[i]+messageExpiredPerid { missedHashes = append(missedHashes, hash) w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: hash,