From f29ce5fef0815837e64391b00651e59d788827d4 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 30 Sep 2024 17:46:35 -0400 Subject: [PATCH] fix: code review --- waku/v2/api/history/cycle.go | 11 +++---- waku/v2/api/history/emitters.go | 4 +-- waku/v2/protocol/store/client_test.go | 46 +++++++++++++-------------- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index 6666dcec..b60ab3ed 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -23,6 +23,7 @@ import ( const defaultBackoff = 10 * time.Second const graylistBackoff = 3 * time.Minute +const storenodeVerificationInterval = time.Second const storenodeMaxFailedRequests uint = 2 const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64" const findNearestMailServer = !isAndroidEmulator @@ -267,7 +268,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { } if pinnedStorenode != "" { - return m.connect(pinnedStorenode) + return m.setActiveStorenode(pinnedStorenode) } m.logger.Info("Finding a new storenode..") @@ -299,7 +300,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { } ms := allStorenodes[r.Int64()] - return m.connect(ms) + return m.setActiveStorenode(ms) } func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { @@ -313,9 +314,7 @@ func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { return peer.status } -func (m *StorenodeCycle) connect(peerID peer.ID) error { - m.logger.Info("connecting to storenode", zap.Stringer("peerID", peerID)) - +func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error { m.activeStorenode = peerID m.StorenodeChangedEmitter.Emit(m.activeStorenode) @@ -363,7 +362,7 @@ func (m *StorenodeCycle) penalizeStorenode(id peer.ID) { } func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) { - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(storenodeVerificationInterval) defer ticker.Stop() for { diff --git a/waku/v2/api/history/emitters.go b/waku/v2/api/history/emitters.go index 2d055954..a12d4db9 100644 --- a/waku/v2/api/history/emitters.go +++ b/waku/v2/api/history/emitters.go @@ -23,8 +23,8 @@ func (s *Emitter[T]) Emit(value T) { s.Lock() defer s.Unlock() - for _, subs := range s.subscriptions { - subs <- value + for _, sub := range s.subscriptions { + sub <- value } } diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 733a27e9..22db524b 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -128,33 +128,33 @@ func TestStoreClient(t *testing.T) { // -- First page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Second page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Third page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 1) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp()) + require.Len(t, response.Messages(), 1) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Trying to continue a completed cursor require.True(t, response.IsComplete()) - require.Len(t, response.messages, 0) + require.Len(t, response.Messages(), 0) err = response.Next(ctx) require.NoError(t, err) @@ -165,26 +165,26 @@ func TestStoreClient(t *testing.T) { // -- First page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Second page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 2) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp()) - require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp()) + require.Len(t, response.Messages(), 2) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp()) + require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) // -- Third page: require.False(t, response.IsComplete()) - require.Len(t, response.messages, 1) - require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) + require.Len(t, response.Messages(), 1) + require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp()) err = response.Next(ctx) require.NoError(t, err) @@ -197,13 +197,13 @@ func TestStoreClient(t *testing.T) { // No cursor should be returned if there are no messages that match the criteria response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2)) require.NoError(t, err) - require.Len(t, response.messages, 0) + require.Len(t, response.Messages(), 0) require.Empty(t, response.Cursor()) // If the page size is larger than the number of existing messages, it should not return a cursor response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100)) require.NoError(t, err) - require.Len(t, response.messages, 5) + require.Len(t, response.Messages(), 5) require.Empty(t, response.Cursor()) // Invalid cursors should fail @@ -231,11 +231,11 @@ func TestStoreClient(t *testing.T) { // Should not include data response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID)) require.NoError(t, err) - require.Len(t, response.messages, 1) - require.Nil(t, response.messages[0].Message) + require.Len(t, response.Messages(), 1) + require.Nil(t, response.Messages()[0].Message) response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false)) require.NoError(t, err) - require.GreaterOrEqual(t, len(response.messages), 1) - require.Nil(t, response.messages[0].Message) + require.GreaterOrEqual(t, len(response.Messages()), 1) + require.Nil(t, response.Messages()[0].Message) }