diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 5d4a8bc50..214e8e3e6 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -320,10 +320,6 @@ func (w *GethWakuWrapper) GetActiveStorenode() peer.ID { panic("not available in WakuV1") } -func (w *GethWakuWrapper) OnStorenodeAvailableOneShot() <-chan struct{} { - panic("not available in WakuV1") -} - func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID { panic("not available in WakuV1") } @@ -336,7 +332,7 @@ func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID { panic("not available in WakuV1") } -func (w *GethWakuWrapper) WaitForAvailableStoreNode(timeout time.Duration) bool { +func (w *GethWakuWrapper) WaitForAvailableStoreNode(ctx context.Context) bool { return false } diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index a30169839..4fc180fce 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -314,10 +314,6 @@ func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID { return w.waku.StorenodeCycle.GetActiveStorenode() } -func (w *gethWakuV2Wrapper) OnStorenodeAvailableOneShot() <-chan struct{} { - return w.waku.StorenodeCycle.StorenodeAvailableOneshotEmitter.Subscribe() -} - func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID { return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe() } @@ -330,8 +326,8 @@ func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID { return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe() } -func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool { - return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout) +func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(ctx context.Context) bool { + return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx) } func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { @@ -353,8 +349,8 @@ func (w *gethWakuV2Wrapper) ProcessMailserverBatch( } criteria := store.FilterCriteria{ - TimeStart: proto.Int64(int64(batch.From) * int64(time.Second)), - TimeEnd: proto.Int64(int64(batch.To) * int64(time.Second)), + TimeStart: proto.Int64(batch.From.UnixNano()), + TimeEnd: proto.Int64(batch.From.UnixNano()), ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index d38c28bbd..9e409ded9 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -202,9 +202,6 @@ type Waku interface { // GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active GetActiveStorenode() peer.ID - // OnStorenodeAvailableOneShot returns a channel that will be triggered only once when a storenode becomes available - OnStorenodeAvailableOneShot() <-chan struct{} - // OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed OnStorenodeChanged() <-chan peer.ID @@ -214,8 +211,8 @@ type Waku interface { // OnStorenodeAvailable is triggered when there is a new active storenode selected OnStorenodeAvailable() <-chan peer.ID - // WaitForAvailableStoreNode will wait for a storenode to be available until `timeout` happens - WaitForAvailableStoreNode(timeout time.Duration) bool + // WaitForAvailableStoreNode will wait for a storenode to be available depending on the context + WaitForAvailableStoreNode(ctx context.Context) bool // SetStorenodeConfigProvider will set the configuration provider for the storenode cycle SetStorenodeConfigProvider(c history.StorenodeConfigProvider) @@ -240,8 +237,8 @@ type Waku interface { } type MailserverBatch struct { - From uint32 - To uint32 + From time.Time + To time.Time Cursor string PubsubTopic string Topics []TopicType @@ -249,7 +246,7 @@ type MailserverBatch struct { } func (mb *MailserverBatch) Hash() string { - data := fmt.Sprintf("%d%d%s%s%v%v", mb.From, mb.To, mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs) + data := fmt.Sprintf("%d%d%s%s%v%v", mb.From.UnixNano(), mb.To.UnixNano(), mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs) hash := sha256.Sum256([]byte(data)) return hex.EncodeToString(hash[:4]) } diff --git a/go.mod b/go.mod index 19f9c2838..14ab53eba 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835 + github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 17db88701..c51826a90 100644 --- a/go.sum +++ b/go.sum @@ -2150,8 +2150,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835 h1:Vp6BhXiDEilmchHy8OLMZVhugudsnvveNkAKD5nhAGk= -github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM= +github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/protocol/messenger.go b/protocol/messenger.go index 2edcef58b..dd5b1c309 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -862,7 +862,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) { if m.archiveManager.IsReady() { go func() { defer gocommon.LogOnPanic() - <-m.transport.OnStorenodeAvailableOneShot() + + select { + case <-m.ctx.Done(): + return + case <-m.transport.OnStorenodeAvailable(): + } + m.InitHistoryArchiveTasks(controlledCommunities) }() } diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 9d7ad082b..b6e53ccc7 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3293,7 +3293,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities WithWaitForResponseOption(request.WaitForResponse), } - community, _, err := m.storeNodeRequestsManager.FetchCommunity(communityAddress, options) + community, _, err := m.storeNodeRequestsManager.FetchCommunity(m.ctx, communityAddress, options) return community, err } @@ -3301,7 +3301,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities // fetchCommunities installs filter for community and requests its details from store node. // When response received it will be passed through signals handler. func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error { - return m.storeNodeRequestsManager.FetchCommunities(communities, []StoreNodeRequestOption{}) + return m.storeNodeRequestsManager.FetchCommunities(m.ctx, communities, []StoreNodeRequestOption{}) } // passStoredCommunityInfoToSignalHandler calls signal handler with community info @@ -3972,7 +3972,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community } // Request possibly missed waku messages for community - ms := m.getCommunityMailserver(c.ID().String()) + ms := m.getCommunityStorenode(c.ID().String()) _, err = m.syncFiltersFrom(ms, filters, uint32(latestWakuMessageTimestamp)) if err != nil { m.logger.Error("failed to request missing messages", zap.Error(err)) @@ -5158,9 +5158,9 @@ func (m *Messenger) startRequestMissingCommunityChannelsHRKeysLoop() { }() } -// getCommunityMailserver returns the active mailserver if a communityID is present then it'll return the mailserver +// getCommunityStorenode returns the active mailserver if a communityID is present then it'll return the mailserver // for that community if it has a mailserver setup otherwise it'll return the global mailserver -func (m *Messenger) getCommunityMailserver(communityID ...string) peer.ID { +func (m *Messenger) getCommunityStorenode(communityID ...string) peer.ID { if m.transport.WakuVersion() != 2 { return "" } @@ -5178,7 +5178,11 @@ func (m *Messenger) getCommunityMailserver(communityID ...string) peer.ID { return m.transport.GetActiveStorenode() } - peerID, _ := ms.PeerID() + peerID, err := ms.PeerID() + if err != nil { + m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err)) + return m.transport.GetActiveStorenode() + } return peerID } diff --git a/protocol/messenger_contacts.go b/protocol/messenger_contacts.go index f9b15ddc2..2d0a0a83f 100644 --- a/protocol/messenger_contacts.go +++ b/protocol/messenger_contacts.go @@ -1321,7 +1321,7 @@ func (m *Messenger) FetchContact(contactID string, waitForResponse bool) (*Conta options := []StoreNodeRequestOption{ WithWaitForResponseOption(waitForResponse), } - contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, options) + contact, _, err := m.storeNodeRequestsManager.FetchContact(m.ctx, contactID, options) return contact, err } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 9bb7b7fef..410c68782 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -2,7 +2,6 @@ package protocol import ( "fmt" - "math" "sort" "time" @@ -69,7 +68,7 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) { go func() { defer gocommon.LogOnPanic() - peerID := m.getCommunityMailserver(chat.CommunityID) + peerID := m.getCommunityStorenode(chat.CommunityID) _, err = m.performStorenodeTask(func() (*MessengerResponse, error) { response, err := m.syncChatWithFilters(peerID, chat.ID) @@ -95,6 +94,7 @@ func (m *Messenger) performStorenodeTask(task func() (*MessengerResponse, error) errCh := make(chan error) go func() { + defer gocommon.LogOnPanic() err := m.transport.PerformStorenodeTask(func() error { r, err := task() if err != nil { @@ -102,6 +102,8 @@ func (m *Messenger) performStorenodeTask(task func() (*MessengerResponse, error) } select { + case <-m.ctx.Done(): + return m.ctx.Err() case responseCh <- r: default: // @@ -143,7 +145,7 @@ func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, erro // split filters by community store node so we can request the filters to the correct mailserver filtersByMs := m.SplitFiltersByStoreNode(filters) for communityID, filtersForMs := range filtersByMs { - peerID := m.getCommunityMailserver(communityID) + peerID := m.getCommunityStorenode(communityID) _, err := m.performStorenodeTask(func() (*MessengerResponse, error) { response, err := m.syncFilters(peerID, filtersForMs) @@ -166,15 +168,14 @@ func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, erro return true, nil } -func (m *Messenger) calculateMailserverTo() uint32 { - seconds := float64(m.GetCurrentTimeInMillis()) / 1000 - return uint32(math.Ceil(seconds)) +func (m *Messenger) calculateMailserverTo() time.Time { + return time.Unix(0, int64(time.Duration(m.GetCurrentTimeInMillis())*time.Millisecond)) } -func (m *Messenger) calculateMailserverTimeBounds(duration time.Duration) (uint32, uint32) { - now := float64(m.GetCurrentTimeInMillis()) / 1000 - to := uint32(math.Ceil(now)) - from := uint32(math.Floor(now)) - uint32(duration.Seconds()) +func (m *Messenger) calculateMailserverTimeBounds(duration time.Duration) (time.Time, time.Time) { + now := time.Unix(0, int64(time.Duration(m.GetCurrentTimeInMillis())*time.Millisecond)) + to := now + from := now.Add(-duration) return from, to } @@ -251,7 +252,7 @@ func (m *Messenger) syncBackup() error { from, to := m.calculateMailserverTimeBounds(oneMonthDuration) batch := types.MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}} - ms := m.getCommunityMailserver(filter.ChatID) + ms := m.getCommunityStorenode(filter.ChatID) err = m.processMailserverBatch(ms, batch) if err != nil { return err @@ -347,7 +348,7 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries filtersByMs := m.SplitFiltersByStoreNode(filters) allResponses := &MessengerResponse{} for communityID, filtersForMs := range filtersByMs { - peerID := m.getCommunityMailserver(communityID) + peerID := m.getCommunityStorenode(communityID) if withRetries { response, err := m.performStorenodeTask(func() (*MessengerResponse, error) { return m.syncFilters(peerID, filtersForMs) @@ -402,7 +403,7 @@ func (m *Messenger) checkForMissingMessagesLoop() { filters := m.transport.Filters() filtersByMs := m.SplitFiltersByStoreNode(filters) for communityID, filtersForMs := range filtersByMs { - peerID := m.getCommunityMailserver(communityID) + peerID := m.getCommunityStorenode(communityID) if peerID == "" { continue } @@ -533,7 +534,7 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, return nil, err } } - batch = types.MailserverBatch{From: from, To: to} + batch = types.MailserverBatch{From: time.Unix(int64(from), 0), To: to} } batch.ChatIDs = append(batch.ChatIDs, chatID) @@ -542,7 +543,7 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, batches[pubsubTopic][batchID] = batch // Set last request to the new `to` - topicData.LastRequest = int(to) + topicData.LastRequest = int(to.Unix()) syncedTopics = append(syncedTopics, topicData) } } @@ -574,8 +575,8 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, ChatIDs: batch.ChatIDs, } - from := batch.To - uint32(oneDayDuration.Seconds()) - if from > batch.From { + from := batch.To.Add(-oneDayDuration) + if from.After(batch.From) { dayBatch.From = from batches24h = append(batches24h, dayBatch) @@ -621,15 +622,15 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() { continue } - gap, err := m.calculateGapForChat(chat, batch.From) + gap, err := m.calculateGapForChat(chat, uint32(batch.From.Unix())) if err != nil { return nil, err } - if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { - chat.SyncedFrom = batch.From + if chat.SyncedFrom == 0 || chat.SyncedFrom > uint32(batch.From.Unix()) { + chat.SyncedFrom = uint32(batch.From.Unix()) } - chat.SyncedTo = to + chat.SyncedTo = uint32(to.Unix()) err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID) if err != nil { @@ -739,7 +740,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { return 0, ErrChatNotFound } - peerID := m.getCommunityMailserver(chat.CommunityID) + peerID := m.getCommunityStorenode(chat.CommunityID) var from uint32 _, err := m.performStorenodeTask(func() (*MessengerResponse, error) { canSync, err := m.canSyncWithStoreNodes() @@ -762,8 +763,8 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { batch := types.MailserverBatch{ ChatIDs: []string{chatID}, - To: chat.SyncedFrom, - From: chat.SyncedFrom - defaultSyncPeriod, + To: time.Unix(int64(chat.SyncedFrom), 0), + From: time.Unix(int64(chat.SyncedFrom-defaultSyncPeriod), 0), PubsubTopic: pubsubTopic, Topics: topics, } @@ -779,14 +780,14 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } - if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { - chat.SyncedFrom = batch.From + if chat.SyncedFrom == 0 || chat.SyncedFrom > uint32(batch.From.Unix()) { + chat.SyncedFrom = uint32(batch.From.Unix()) } - m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) + m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From.Unix())), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) - err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID) - from = batch.From + err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID) + from = uint32(batch.From.Unix()) return nil, err }, history.WithPeerID(peerID)) if err != nil { @@ -830,8 +831,8 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error { batch := types.MailserverBatch{ ChatIDs: []string{chatID}, - To: highestTo, - From: lowestFrom, + To: time.Unix(int64(highestTo), 0), + From: time.Unix(int64(lowestFrom), 0), PubsubTopic: pubsubTopic, Topics: topics, } @@ -840,7 +841,7 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error { m.config.messengerSignalsHandler.HistoryRequestStarted(1) } - peerID := m.getCommunityMailserver(chat.CommunityID) + peerID := m.getCommunityStorenode(chat.CommunityID) err = m.processMailserverBatch(peerID, batch) if err != nil { return err @@ -907,7 +908,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32 return 0, ErrChatNotFound } - peerID := m.getCommunityMailserver(chat.CommunityID) + peerID := m.getCommunityStorenode(chat.CommunityID) _, err := m.performStorenodeTask(func() (*MessengerResponse, error) { canSync, err := m.canSyncWithStoreNodes() if err != nil { @@ -917,7 +918,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32 return nil, nil } - m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.Stringer("storenodeID", peerID)) + m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.Stringer("peerID", peerID)) pubsubTopic, topics, err := m.topicsForChat(chatID) if err != nil { return nil, nil @@ -942,13 +943,13 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32 if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } - if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { - chat.SyncedFrom = batch.From + if chat.SyncedFrom == 0 || chat.SyncedFrom > uint32(batch.From.Second()) { + chat.SyncedFrom = uint32(batch.From.Second()) } - m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) + m.logger.Debug("setting sync timestamps", zap.Int64("from", batch.From.Unix()), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) - err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID) + err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID) from = batch.From return nil, err }, history.WithPeerID(peerID)) @@ -956,5 +957,5 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32 return 0, err } - return from, nil + return uint32(from.Unix()), nil } diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go index 26f875d65..fe725093f 100644 --- a/protocol/messenger_store_node_request_manager.go +++ b/protocol/messenger_store_node_request_manager.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "database/sql" "fmt" "strings" @@ -75,7 +76,7 @@ func NewStoreNodeRequestManager(m *Messenger) *StoreNodeRequestManager { // the function will also wait for the store node response and return the fetched community. // Automatically waits for an available store node. // When a `nil` community and `nil` error is returned, that means the community wasn't found at the store node. -func (m *StoreNodeRequestManager) FetchCommunity(community communities.CommunityShard, opts []StoreNodeRequestOption) (*communities.Community, StoreNodeRequestStats, error) { +func (m *StoreNodeRequestManager) FetchCommunity(ctx context.Context, community communities.CommunityShard, opts []StoreNodeRequestOption) (*communities.Community, StoreNodeRequestStats, error) { cfg := buildStoreNodeRequestConfig(opts) m.logger.Info("requesting community from store node", @@ -83,7 +84,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community zap.Any("config", cfg)) requestCommunity := func(communityID string, shard *shard.Shard) (*communities.Community, StoreNodeRequestStats, error) { - channel, err := m.subscribeToRequest(storeNodeCommunityRequest, communityID, shard, cfg) + channel, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err) } @@ -100,7 +101,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community communityShard := community.Shard if communityShard == nil { id := transport.CommunityShardInfoTopic(community.CommunityID) - fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultNonProtectedShard(), cfg) + fetchedShard, err := m.subscribeToRequest(ctx, storeNodeShardRequest, id, shard.DefaultNonProtectedShard(), cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err) } @@ -134,7 +135,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community // those content topics is spammed with to many envelopes, then on each iteration we will have to fetch all // of this spam first to get the envelopes in other content topics. To avoid this we keep independent requests // for each content topic. -func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.CommunityShard, opts []StoreNodeRequestOption) error { +func (m *StoreNodeRequestManager) FetchCommunities(ctx context.Context, communities []communities.CommunityShard, opts []StoreNodeRequestOption) error { m.logger.Info("requesting communities from store node", zap.Any("communities", communities)) // when fetching multiple communities we don't wait for the response @@ -143,7 +144,7 @@ func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.Com var outErr error for _, community := range communities { - _, _, err := m.FetchCommunity(community, opts) + _, _, err := m.FetchCommunity(ctx, community, opts) if err != nil { outErr = fmt.Errorf("%sfailed to create a request for community %s: %w", outErr, community.CommunityID, err) } @@ -154,7 +155,7 @@ func (m *StoreNodeRequestManager) FetchCommunities(communities []communities.Com // FetchContact - similar to FetchCommunity // If a `nil` contact and a `nil` error are returned, it means that the contact wasn't found at the store node. -func (m *StoreNodeRequestManager) FetchContact(contactID string, opts []StoreNodeRequestOption) (*Contact, StoreNodeRequestStats, error) { +func (m *StoreNodeRequestManager) FetchContact(ctx context.Context, contactID string, opts []StoreNodeRequestOption) (*Contact, StoreNodeRequestStats, error) { cfg := buildStoreNodeRequestConfig(opts) @@ -162,7 +163,7 @@ func (m *StoreNodeRequestManager) FetchContact(contactID string, opts []StoreNod zap.Any("contactID", contactID), zap.Any("config", cfg)) - channel, err := m.subscribeToRequest(storeNodeContactRequest, contactID, nil, cfg) + channel, err := m.subscribeToRequest(ctx, storeNodeContactRequest, contactID, nil, cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err) } @@ -178,7 +179,7 @@ func (m *StoreNodeRequestManager) FetchContact(contactID string, opts []StoreNod // subscribeToRequest checks if a request for given community/contact is already in progress, creates and installs // a new one if not found, and returns a subscription to the result of the found/started request. // The subscription can then be used to get the result of the request, this could be either a community/contact or an error. -func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeRequestType, dataID string, shard *shard.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) { +func (m *StoreNodeRequestManager) subscribeToRequest(ctx context.Context, requestType storeNodeRequestType, dataID string, shard *shard.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) { // It's important to unlock only after getting the subscription channel. // We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription // created in this function will get the result even if the requests proceeds faster than this function ends. @@ -206,7 +207,7 @@ func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeReques return nil, fmt.Errorf("failed to create community filter: %w", err) } - request = m.newStoreNodeRequest() + request = m.newStoreNodeRequest(ctx) request.config = cfg request.pubsubTopic = filter.PubsubTopic request.requestID = requestID @@ -223,9 +224,10 @@ func (m *StoreNodeRequestManager) subscribeToRequest(requestType storeNodeReques } // newStoreNodeRequest creates a new storeNodeRequest struct -func (m *StoreNodeRequestManager) newStoreNodeRequest() *storeNodeRequest { +func (m *StoreNodeRequestManager) newStoreNodeRequest(ctx context.Context) *storeNodeRequest { return &storeNodeRequest{ manager: m, + ctx: ctx, subscriptions: make([]storeNodeResponseSubscription, 0), } } @@ -306,6 +308,7 @@ const ( // For a valid storeNodeRequest to be performed, the user must set all the struct fields and call start method. type storeNodeRequest struct { requestID storeNodeRequestID + ctx context.Context // request parameters pubsubTopic string @@ -524,13 +527,15 @@ func (r *storeNodeRequest) routine() { communityID := r.requestID.getCommunityID() if r.requestID.RequestType != storeNodeCommunityRequest || !r.manager.messenger.communityStorenodes.HasStorenodeSetup(communityID) { - if !r.manager.messenger.transport.WaitForAvailableStoreNode(storeNodeAvailableTimeout) { + ctx, cancel := context.WithTimeout(r.ctx, storeNodeAvailableTimeout) + defer cancel() + if !r.manager.messenger.transport.WaitForAvailableStoreNode(ctx) { r.result.err = fmt.Errorf("store node is not available") return } } - storeNode := r.manager.messenger.getCommunityMailserver(communityID) + storeNode := r.manager.messenger.getCommunityStorenode(communityID) // Check if community already exists locally and get Clock. if r.requestID.RequestType == storeNodeCommunityRequest { diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 84bede934..f286f25b8 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -310,7 +310,9 @@ func (s *MessengerStoreNodeRequestSuite) fetchProfile(m *Messenger, contactID st } func (s *MessengerStoreNodeRequestSuite) WaitForAvailableStoreNode(messenger *Messenger) { - WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), storeNodeConnectTimeout) + defer cancel() + WaitForAvailableStoreNode(&s.Suite, messenger, ctx) } func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) { diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index efed71fc0..015d8c202 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -364,8 +364,8 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim s.Require().True(ok) } -func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, timeout time.Duration) { - available := m.transport.WaitForAvailableStoreNode(timeout) +func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, ctx context.Context) { + available := m.transport.WaitForAvailableStoreNode(ctx) s.Require().True(available) } diff --git a/protocol/storenodes/storenodes.go b/protocol/storenodes/storenodes.go index c6188cfe1..dfc453f3d 100644 --- a/protocol/storenodes/storenodes.go +++ b/protocol/storenodes/storenodes.go @@ -62,10 +62,8 @@ func (m *CommunityStorenodes) IsCommunityStoreNode(peerID peer.ID) bool { for _, data := range m.storenodesByCommunityID { for _, snode := range data.storenodes { commStorenodeID, err := utils.GetPeerID(snode.Address) - if err == nil { - if commStorenodeID == peerID { - return true - } + if err == nil && commStorenodeID == peerID { + return true } } } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 3780639b2..0f8df437c 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -639,10 +639,6 @@ func (t *Transport) DisconnectActiveStorenode(ctx context.Context, backoffReason t.waku.DisconnectActiveStorenode(ctx, backoffReason, shouldCycle) } -func (t *Transport) OnStorenodeAvailableOneShot() <-chan struct{} { - return t.waku.OnStorenodeAvailableOneShot() -} - func (t *Transport) OnStorenodeChanged() <-chan peer.ID { return t.waku.OnStorenodeChanged() } @@ -655,8 +651,8 @@ func (t *Transport) OnStorenodeAvailable() <-chan peer.ID { return t.waku.OnStorenodeAvailable() } -func (t *Transport) WaitForAvailableStoreNode(timeout time.Duration) bool { - return t.waku.WaitForAvailableStoreNode(timeout) +func (t *Transport) WaitForAvailableStoreNode(ctx context.Context) bool { + return t.waku.WaitForAvailableStoreNode(ctx) } func (t *Transport) IsStorenodeAvailable(peerID peer.ID) bool { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/result.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/result.go new file mode 100644 index 000000000..d8ce175a9 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/result.go @@ -0,0 +1,17 @@ +package common + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type StoreRequestResult interface { + Cursor() []byte + IsComplete() bool + PeerID() peer.ID + Next(ctx context.Context, opts ...store.RequestOption) error // TODO: see how to decouple store.RequestOption + Messages() []*pb.WakuMessageKeyValue +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index 313ee0a45..d34531406 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -409,14 +409,10 @@ func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProv m.storenodeConfigProvider = provider } -func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool { - // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. +func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool { + // Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. // This can be improved after merging https://github.com/status-im/status-go/pull/4380. // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately - timeout += time.Second - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() wg := sync.WaitGroup{} wg.Add(1) @@ -426,7 +422,18 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-m.StorenodeAvailableOneshotEmitter.Subscribe(): case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return + } + + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled + } + return + } } }() @@ -434,6 +441,11 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-waitForWaitGroup(&wg): case <-ctx.Done(): + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled o + } } return m.IsStorenodeAvailable(m.activeStorenode) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go new file mode 100644 index 000000000..248d61c6d --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go @@ -0,0 +1,33 @@ +package missing + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/api/common" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" +) + +func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor { + return &defaultStorenodeRequestor{ + store: store, + } +} + +type defaultStorenodeRequestor struct { + store *store.WakuStore +} + +func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { + return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) +} + +func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) { + return d.store.Query(ctx, store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), + TimeStart: from, + TimeEnd: to, + }, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false)) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 095e32419..1af991eb5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -11,9 +11,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -22,6 +22,7 @@ import ( const maxContentTopicsPerRequest = 10 const maxMsgHashesPerRequest = 50 +const messageFetchPageSize = 100 // MessageTracker should keep track of messages it has seen before and // provide a way to determine whether a message exists or not. This @@ -30,25 +31,30 @@ type MessageTracker interface { MessageExists(pb.MessageHash) (bool, error) } +type StorenodeRequestor interface { + GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) + QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) +} + // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria type MissingMessageVerifier struct { ctx context.Context params missingMessageVerifierParams - messageTracker MessageTracker + storenodeRequestor StorenodeRequestor + messageTracker MessageTracker criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex C <-chan *protocol.Envelope - store *store.WakuStore timesource timesource.Timesource logger *zap.Logger } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier -func NewMissingMessageVerifier(store *store.WakuStore, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { +func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { options = append(defaultMissingMessagesVerifierOptions, options...) params := missingMessageVerifierParams{} for _, opt := range options { @@ -56,11 +62,11 @@ func NewMissingMessageVerifier(store *store.WakuStore, messageTracker MessageTra } return &MissingMessageVerifier{ - store: store, - timesource: timesource, - messageTracker: messageTracker, - logger: logger.Named("missing-msg-verifier"), - params: params, + storenodeRequestor: storenodeRequester, + timesource: timesource, + messageTracker: messageTracker, + logger: logger.Named("missing-msg-verifier"), + params: params, } } @@ -178,7 +184,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } } -func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (store.Result, error), logger *zap.Logger, logMsg string) (store.Result, error) { +func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.StoreRequestResult, error), logger *zap.Logger, logMsg string) (common.StoreRequestResult, error) { retry := true count := 1 for retry && count <= m.params.maxAttemptsToRetrieveHistory { @@ -212,12 +218,16 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, logging.Epoch("to", now), ) - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { - return m.store.Query(ctx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(interest.contentFilter.PubsubTopic, contentTopics[batchFrom:batchTo]...), - TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), - TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), - }, store.WithPeer(interest.peerID), store.WithPaging(false, 100), store.IncludeData(false)) + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { + return m.storenodeRequestor.QueryWithCriteria( + ctx, + interest.peerID, + messageFetchPageSize, + interest.contentFilter.PubsubTopic, + contentTopics[batchFrom:batchTo], + proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + proto.Int64(now.Add(-m.params.delay).UnixNano()), + ) }, logger, "retrieving history to check for missing messages") if err != nil { if !errors.Is(err, context.Canceled) { @@ -243,7 +253,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, missingHashes = append(missingHashes, hash) } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { if err = result.Next(ctx); err != nil { return nil, err } @@ -282,10 +292,10 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer utils.LogOnPanic() defer wg.Wait() - result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { + result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() - return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) + return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { @@ -303,7 +313,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, } } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (store.Result, error) { + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { if err = result.Next(ctx); err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_publisher.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_publisher.go new file mode 100644 index 000000000..4ca940cef --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_publisher.go @@ -0,0 +1,50 @@ +package publish + +import ( + "context" + "errors" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" +) + +var ErrRelayNotAvailable = errors.New("relay is not available") +var ErrLightpushNotAvailable = errors.New("lightpush is not available") + +func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher { + return &defaultPublisher{ + lightpush: lightpush, + relay: relay, + } +} + +type defaultPublisher struct { + lightpush *lightpush.WakuLightPush + relay *relay.WakuRelay +} + +func (d *defaultPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) { + if d.relay == nil { + return nil, ErrRelayNotAvailable + } + + return d.relay.PubSub().ListPeers(pubsubTopic), nil +} + +func (d *defaultPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + if d.relay == nil { + return pb.MessageHash{}, ErrRelayNotAvailable + } + + return d.relay.Publish(ctx, message, relay.WithPubSubTopic(pubsubTopic)) +} + +func (d *defaultPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) { + if d.lightpush == nil { + return pb.MessageHash{}, ErrLightpushNotAvailable + } + + return d.lightpush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers)) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go new file mode 100644 index 000000000..68eca0304 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/default_verifier.go @@ -0,0 +1,39 @@ +package publish + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" +) + +func NewDefaultStorenodeMessageVerifier(store *store.WakuStore) StorenodeMessageVerifier { + return &defaultStorenodeMessageVerifier{ + store: store, + } +} + +type defaultStorenodeMessageVerifier struct { + store *store.WakuStore +} + +func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { + var opts []store.RequestOption + opts = append(opts, store.WithRequestID(requestID)) + opts = append(opts, store.WithPeer(peerID)) + opts = append(opts, store.WithPaging(false, pageSize)) + opts = append(opts, store.IncludeData(false)) + + response, err := d.store.QueryByHash(ctx, messageHashes, opts...) + if err != nil { + return nil, err + } + + result := make([]pb.MessageHash, len(response.Messages())) + for i, msg := range response.Messages() { + result[i] = msg.WakuMessageHash() + } + + return result, nil +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index 059165740..8a37e20ce 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -8,11 +8,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p/core/peer" apicommon "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -31,6 +31,11 @@ type ISentCheck interface { DeleteByMessageIDs(messageIDs []common.Hash) } +type StorenodeMessageVerifier interface { + // MessagesExist returns a list of the messages it found from a list of message hashes + MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) +} + // MessageSentCheck tracks the outgoing messages and check against store node // if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query // if the message keeps missing after `messageExpiredPerid`, the message id will be expired @@ -40,7 +45,7 @@ type MessageSentCheck struct { messageStoredChan chan common.Hash messageExpiredChan chan common.Hash ctx context.Context - store *store.WakuStore + messageVerifier StorenodeMessageVerifier storenodeCycle *history.StorenodeCycle timesource timesource.Timesource logger *zap.Logger @@ -52,14 +57,14 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, messageVerifier StorenodeMessageVerifier, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, messageStoredChan: msgStoredChan, messageExpiredChan: msgExpiredChan, ctx: ctx, - store: store, + messageVerifier: messageVerifier, storenodeCycle: cycle, timesource: timesource, logger: logger, @@ -212,12 +217,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c return []common.Hash{} } - var opts []store.RequestOption requestID := protocol.GenerateRequestID() - opts = append(opts, store.WithRequestID(requestID)) - opts = append(opts, store.WithPeer(selectedPeer)) - opts = append(opts, store.WithPaging(false, m.maxHashQueryLength)) - opts = append(opts, store.IncludeData(false)) messageHashes := make([]pb.MessageHash, len(hashes)) for i, hash := range hashes { @@ -228,20 +228,20 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) defer cancel() - result, err := m.store.QueryByHash(queryCtx, messageHashes, opts...) + result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes) if err != nil { m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) return []common.Hash{} } - m.logger.Debug("store.queryByHash result", zap.String("requestID", hexutil.Encode(requestID)), zap.Int("messages", len(result.Messages()))) + m.logger.Debug("store.queryByHash result", zap.String("requestID", hexutil.Encode(requestID)), zap.Int("messages", len(result))) var ackHashes []common.Hash var missedHashes []common.Hash for i, hash := range hashes { found := false - for _, msg := range result.Messages() { - if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) { + for _, msgHash := range result { + if bytes.Equal(msgHash.Bytes(), hash.Bytes()) { found = true break } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go index c1e9a4ca7..c457589e7 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go @@ -6,9 +6,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -36,10 +36,20 @@ func (pm PublishMethod) String() string { } } +type Publisher interface { + // RelayListPeers returns the list of peers for a pubsub topic + RelayListPeers(pubsubTopic string) ([]peer.ID, error) + + // RelayPublish publishes a message via WakuRelay + RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) + + // LightpushPublish publishes a message via WakuLightPush + LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) +} + type MessageSender struct { publishMethod PublishMethod - lightPush *lightpush.WakuLightPush - relay *relay.WakuRelay + publisher Publisher messageSentCheck ISentCheck rateLimiter *PublishRateLimiter logger *zap.Logger @@ -64,14 +74,13 @@ func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request { return r } -func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) (*MessageSender, error) { +func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger *zap.Logger) (*MessageSender, error) { if publishMethod == UnknownMethod { return nil, errors.New("publish method is required") } return &MessageSender{ publishMethod: publishMethod, - lightPush: lightPush, - relay: relay, + publisher: publisher, rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), logger: logger, }, nil @@ -108,26 +117,23 @@ func (ms *MessageSender) Send(req *Request) error { switch publishMethod { case LightPush: - if ms.lightPush == nil { - return errors.New("lightpush is not available") - } logger.Info("publishing message via lightpush") - _, err := ms.lightPush.Publish( + _, err := ms.publisher.LightpushPublish( req.ctx, req.envelope.Message(), - lightpush.WithPubSubTopic(req.envelope.PubsubTopic()), - lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush), + req.envelope.PubsubTopic(), + DefaultPeersToPublishForLightpush, ) if err != nil { return err } case Relay: - if ms.relay == nil { - return errors.New("relay is not available") + peers, err := ms.publisher.RelayListPeers(req.envelope.PubsubTopic()) + if err != nil { + return err } - peerCnt := len(ms.relay.PubSub().ListPeers(req.envelope.PubsubTopic())) - logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := ms.relay.Publish(req.ctx, req.envelope.Message(), relay.WithPubSubTopic(req.envelope.PubsubTopic())) + logger.Info("publishing message via relay", zap.Int("peerCnt", len(peers))) + _, err = ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic()) if err != nil { return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 090ef8f04..6b3c9b2e1 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -263,7 +263,7 @@ func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) ( } func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", storeRequest.RequestId)) logger.Debug("sending store request") diff --git a/vendor/modules.txt b/vendor/modules.txt index 59f44dd34..ef4aaca80 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1040,7 +1040,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835 +# github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests