fix: code review

This commit is contained in:
Richard Ramos 2024-09-30 17:46:35 -04:00
parent da402dc415
commit f29ce5fef0
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
3 changed files with 30 additions and 31 deletions

View File

@ -23,6 +23,7 @@ import (
const defaultBackoff = 10 * time.Second
const graylistBackoff = 3 * time.Minute
const storenodeVerificationInterval = time.Second
const storenodeMaxFailedRequests uint = 2
const isAndroidEmulator = runtime.GOOS == "android" && runtime.GOARCH == "amd64"
const findNearestMailServer = !isAndroidEmulator
@ -267,7 +268,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}
if pinnedStorenode != "" {
return m.connect(pinnedStorenode)
return m.setActiveStorenode(pinnedStorenode)
}
m.logger.Info("Finding a new storenode..")
@ -299,7 +300,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}
ms := allStorenodes[r.Int64()]
return m.connect(ms)
return m.setActiveStorenode(ms)
}
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
@ -313,9 +314,7 @@ func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {
return peer.status
}
func (m *StorenodeCycle) connect(peerID peer.ID) error {
m.logger.Info("connecting to storenode", zap.Stringer("peerID", peerID))
func (m *StorenodeCycle) setActiveStorenode(peerID peer.ID) error {
m.activeStorenode = peerID
m.StorenodeChangedEmitter.Emit(m.activeStorenode)
@ -363,7 +362,7 @@ func (m *StorenodeCycle) penalizeStorenode(id peer.ID) {
}
func (m *StorenodeCycle) verifyStorenodeStatus(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(storenodeVerificationInterval)
defer ticker.Stop()
for {

View File

@ -23,8 +23,8 @@ func (s *Emitter[T]) Emit(value T) {
s.Lock()
defer s.Unlock()
for _, subs := range s.subscriptions {
subs <- value
for _, sub := range s.subscriptions {
sub <- value
}
}

View File

@ -128,33 +128,33 @@ func TestStoreClient(t *testing.T) {
// -- First page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
// -- Second page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
// -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp())
require.Len(t, response.Messages(), 1)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
// -- Trying to continue a completed cursor
require.True(t, response.IsComplete())
require.Len(t, response.messages, 0)
require.Len(t, response.Messages(), 0)
err = response.Next(ctx)
require.NoError(t, err)
@ -165,26 +165,26 @@ func TestStoreClient(t *testing.T) {
// -- First page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
// -- Second page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Len(t, response.Messages(), 2)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
// -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Len(t, response.Messages(), 1)
require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
err = response.Next(ctx)
require.NoError(t, err)
@ -197,13 +197,13 @@ func TestStoreClient(t *testing.T) {
// No cursor should be returned if there are no messages that match the criteria
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2))
require.NoError(t, err)
require.Len(t, response.messages, 0)
require.Len(t, response.Messages(), 0)
require.Empty(t, response.Cursor())
// If the page size is larger than the number of existing messages, it should not return a cursor
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100))
require.NoError(t, err)
require.Len(t, response.messages, 5)
require.Len(t, response.Messages(), 5)
require.Empty(t, response.Cursor())
// Invalid cursors should fail
@ -231,11 +231,11 @@ func TestStoreClient(t *testing.T) {
// Should not include data
response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID))
require.NoError(t, err)
require.Len(t, response.messages, 1)
require.Nil(t, response.messages[0].Message)
require.Len(t, response.Messages(), 1)
require.Nil(t, response.Messages()[0].Message)
response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false))
require.NoError(t, err)
require.GreaterOrEqual(t, len(response.messages), 1)
require.Nil(t, response.messages[0].Message)
require.GreaterOrEqual(t, len(response.Messages()), 1)
require.Nil(t, response.Messages()[0].Message)
}