From 62d4ce9b7ea56af5aad068b2ea68e079cf534498 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 21 Oct 2024 17:13:30 -0400 Subject: [PATCH] fix_: simplify `performStorenodeTask` --- protocol/messenger_mailserver.go | 42 +++++++------------ protocol/messenger_mailserver_cycle.go | 2 + protocol/messenger_storenode_comunity_test.go | 4 +- protocol/messenger_storenode_request_test.go | 6 +-- protocol/messenger_testing_utils.go | 2 + 5 files changed, 25 insertions(+), 31 deletions(-) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 410c68782..9cefd90ab 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -90,35 +90,25 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) { } func (m *Messenger) performStorenodeTask(task func() (*MessengerResponse, error), opts ...history.StorenodeTaskOption) (*MessengerResponse, error) { - responseCh := make(chan *MessengerResponse) - errCh := make(chan error) - - go func() { - defer gocommon.LogOnPanic() - err := m.transport.PerformStorenodeTask(func() error { - r, err := task() - if err != nil { - return err - } - - select { - case <-m.ctx.Done(): - return m.ctx.Err() - case responseCh <- r: - default: - // - } - - return nil - }, opts...) + responseCh := make(chan *MessengerResponse, 1) + err := m.transport.PerformStorenodeTask(func() error { + r, err := task() if err != nil { - errCh <- err + return err } - }() + + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case responseCh <- r: + return nil + } + }, opts...) + if err != nil { + return nil, err + } select { - case err := <-errCh: - return nil, err case r := <-responseCh: if r != nil { return r, nil @@ -784,7 +774,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { chat.SyncedFrom = uint32(batch.From.Unix()) } - m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From.Unix())), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) + m.logger.Debug("setting sync timestamps", zap.Int64("from", batch.From.Unix()), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID) from = uint32(batch.From.Unix()) diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index befeee89f..f7145ebb5 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -156,6 +156,8 @@ func (m *Messenger) Storenodes() ([]peer.ID, error) { } func (m *Messenger) checkForStorenodeCycleSignals() { + defer gocommon.LogOnPanic() + if m.transport.WakuVersion() != 2 { return } diff --git a/protocol/messenger_storenode_comunity_test.go b/protocol/messenger_storenode_comunity_test.go index 51cbf712f..c5f444585 100644 --- a/protocol/messenger_storenode_comunity_test.go +++ b/protocol/messenger_storenode_comunity_test.go @@ -149,7 +149,7 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd } func (s *MessengerStoreNodeCommunitySuite) createCommunityWithChat(m *Messenger) (*communities.Community, *Chat) { - WaitForAvailableStoreNode(&s.Suite, m, 500*time.Millisecond) + WaitForAvailableStoreNode(&s.Suite, m, context.TODO()) storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil) @@ -197,7 +197,7 @@ func (s *MessengerStoreNodeCommunitySuite) fetchCommunity(m *Messenger, communit WithWaitForResponseOption(true), } - fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options) + fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityShard, options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity) diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index f286f25b8..c6fdd0502 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -290,7 +290,7 @@ func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityS WithWaitForResponseOption(true), } - fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options) + fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityShard, options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity) @@ -808,7 +808,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() { } // Fetch the community - fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options) + fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(context.TODO(), community.CommunityShard(), options) s.Require().NoError(err) s.requireCommunitiesEqual(fetchedCommunity, community) @@ -1162,7 +1162,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() { } storeNodeRequestOptions = append(storeNodeRequestOptions, exampleToRun.CustomOptions...) - fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions) + fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityAddress, storeNodeRequestOptions) result.EnvelopesCount = stats.FetchedEnvelopesCount result.FetchedCommunity = fetchedCommunity diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 015d8c202..54aee55af 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -365,6 +365,8 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim } func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() available := m.transport.WaitForAvailableStoreNode(ctx) s.Require().True(available) }