From 9ed8f16a0add934d74d0bfbfa1a839f195008cdd Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 19 Apr 2023 16:54:33 -0400 Subject: [PATCH] feat: add metrics to archive protocol and discv5 --- waku/metrics/http.go | 9 ++- waku/persistence/store.go | 63 +++++++++++++++- waku/persistence/store_test.go | 4 +- waku/v2/discv5/discover.go | 5 ++ waku/v2/metrics/metrics.go | 75 ++++++++++++++----- waku/v2/protocol/filter/filter_test.go | 12 +-- .../legacy_filter/waku_filter_test.go | 12 +-- .../protocol/lightpush/waku_lightpush_test.go | 6 +- waku/v2/protocol/store/waku_resume_test.go | 16 ++-- waku/v2/protocol/store/waku_store_client.go | 2 - waku/v2/protocol/store/waku_store_common.go | 2 - waku/v2/protocol/store/waku_store_protocol.go | 38 ++-------- .../store/waku_store_protocol_test.go | 41 +++++----- .../protocol/store/waku_store_query_test.go | 15 ++-- waku/v2/rest/relay_test.go | 9 ++- waku/v2/rest/store_test.go | 7 +- 16 files changed, 202 insertions(+), 114 deletions(-) diff --git a/waku/metrics/http.go b/waku/metrics/http.go index 735636e9..a8e8c95a 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -61,12 +61,17 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server { metrics.FilterMessagesView, metrics.FilterRequestDurationView, metrics.FilterRequestsView, - metrics.StoreErrorTypesView, metrics.LightpushMessagesView, metrics.LightpushErrorTypesView, metrics.DnsDiscoveryNodesView, metrics.DnsDiscoveryErrorTypesView, - metrics.StoreMessagesView, + metrics.DiscV5ErrorTypesView, + metrics.StoreErrorTypesView, + metrics.StoreQueriesView, + metrics.ArchiveErrorTypesView, + metrics.ArchiveInsertDurationView, + metrics.ArchiveMessagesView, + metrics.ArchiveQueryDurationView, metrics.StoreErrorTypesView, metrics.StoreQueriesView, metrics.PeersView, diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 8a8a8fa9..840456a3 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" @@ -19,6 +20,7 @@ import ( type MessageProvider interface { GetAll() ([]StoredMessage, error) + Validate(env *protocol.Envelope) error Put(env *protocol.Envelope) error Query(query *pb.HistoryQuery) ([]StoredMessage, error) MostRecentTimestamp() (int64, error) @@ -27,10 +29,15 @@ type MessageProvider interface { } var ErrInvalidCursor = errors.New("invalid cursor") +var ErrFutureMessage = errors.New("message timestamp in the future") +var ErrMessageTooOld = errors.New("message too old") // WALMode for sqlite. const WALMode = "wal" +// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp +const MaxTimeVariance = time.Duration(20) * time.Second + // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { MessageProvider @@ -152,18 +159,39 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e d.cancel = cancel d.timesource = timesource - err := d.cleanOlderRecords() + err := d.cleanOlderRecords(ctx) if err != nil { return err } - d.wg.Add(1) + d.wg.Add(2) go d.checkForOlderRecords(ctx, 60*time.Second) + go d.updateMetrics(ctx) return nil } -func (d *DBStore) cleanOlderRecords() error { +func (store *DBStore) updateMetrics(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + defer store.wg.Done() + + for { + select { + case <-ticker.C: + msgCount, err := store.Count() + if err != nil { + store.log.Error("updating store metrics", zap.Error(err)) + } else { + metrics.RecordArchiveMessage(ctx, "stored", msgCount) + } + case <-ctx.Done(): + return + } + } +} + +func (d *DBStore) cleanOlderRecords(ctx context.Context) error { d.log.Info("Cleaning older records...") // Delete older messages @@ -172,6 +200,7 @@ func (d *DBStore) cleanOlderRecords() error { sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1` _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration))) if err != nil { + metrics.RecordArchiveError(ctx, "retpolicy_failure") return err } elapsed := time.Since(start) @@ -184,6 +213,7 @@ func (d *DBStore) cleanOlderRecords() error { sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET $1)` _, err := d.db.Exec(sqlStmt, d.maxMessages) if err != nil { + metrics.RecordArchiveError(ctx, "retpolicy_failure") return err } elapsed := time.Since(start) @@ -206,7 +236,7 @@ func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) { case <-ctx.Done(): return case <-ticker.C: - err := d.cleanOlderRecords() + err := d.cleanOlderRecords(ctx) if err != nil { d.log.Error("cleaning older records", zap.Error(err)) } @@ -225,19 +255,41 @@ func (d *DBStore) Stop() { d.db.Close() } +func (d *DBStore) Validate(env *protocol.Envelope) error { + n := time.Unix(0, env.Index().ReceiverTime) + upperBound := n.Add(MaxTimeVariance) + lowerBound := n.Add(-MaxTimeVariance) + + // Ensure that messages don't "jump" to the front of the queue with future timestamps + if env.Message().Timestamp > upperBound.UnixNano() { + return ErrFutureMessage + } + + if env.Message().Timestamp < lowerBound.UnixNano() { + return ErrMessageTooOld + } + + return nil +} + // Put inserts a WakuMessage into the DB func (d *DBStore) Put(env *protocol.Envelope) error { stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)") if err != nil { + metrics.RecordArchiveError(context.TODO(), "insert_failure") return err } cursor := env.Index() dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest) + + start := time.Now() _, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version) if err != nil { return err } + ellapsed := time.Since(start) + metrics.ArchiveInsertDurationSeconds.M(int64(ellapsed.Seconds())) err = stmt.Close() if err != nil { @@ -352,10 +404,13 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err pageSize := query.PagingInfo.PageSize + 1 parameters = append(parameters, pageSize) + measurementStart := time.Now() rows, err := stmt.Query(parameters...) if err != nil { return nil, nil, err } + ellapsed := time.Since(measurementStart) + metrics.ArchiveQueryDurationSeconds.M(int64(ellapsed.Seconds())) var result []StoredMessage for rows.Next() { diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 597de85b..08dceb6b 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -48,7 +48,7 @@ func TestDbStore(t *testing.T) { require.NoError(t, err) require.Empty(t, res) - err = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test", 1), utils.GetUnixEpoch(), "test")) + err = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test", utils.GetUnixEpoch()), utils.GetUnixEpoch(), "test")) require.NoError(t, err) res, err = store.GetAll() @@ -65,7 +65,7 @@ func TestStoreRetention(t *testing.T) { require.NoError(t, err) insertTime := time.Now() - + ////////////////////////////////// _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-70*time.Second).UnixNano()), insertTime.Add(-70*time.Second).UnixNano(), "test")) _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test2", insertTime.Add(-60*time.Second).UnixNano()), insertTime.Add(-60*time.Second).UnixNano(), "test")) _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-50*time.Second).UnixNano()), insertTime.Add(-50*time.Second).UnixNano(), "test")) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 788dbedb..c710ce54 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -14,6 +14,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-discover/discover" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -245,6 +246,7 @@ func evaluateNode(node *enode.Node) bool { _, err := utils.EnodeToPeerInfo(node) if err != nil { + metrics.RecordDiscV5Error(context.Background(), "peer_info_failure") utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) return false } @@ -264,6 +266,7 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) { func (d *DiscoveryV5) iterate(ctx context.Context) error { iterator, err := d.Iterator() if err != nil { + metrics.RecordDiscV5Error(context.Background(), "iterator_failure") return fmt.Errorf("obtaining iterator: %w", err) } @@ -294,12 +297,14 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error { _, addresses, err := utils.Multiaddress(iterator.Node()) if err != nil { + metrics.RecordDiscV5Error(context.Background(), "peer_info_failure") d.log.Error("extracting multiaddrs from enr", zap.Error(err)) continue } peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...) if err != nil { + metrics.RecordDiscV5Error(context.Background(), "peer_info_failure") d.log.Error("converting multiaddrs to addrinfos", zap.Error(err)) continue } diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 35bed0b7..29f76129 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -30,17 +30,23 @@ var ( FilterRequestDurationSeconds = stats.Int64("filter_request_duration_seconds", "Duration of Filter Subscribe Requests", stats.UnitSeconds) FilterHandleMessageDurationSeconds = stats.Int64("filter_handle_msessageduration_seconds", "Duration to Push Message to Filter Subscribers", stats.UnitSeconds) - StoredMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless) - StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) - StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless) + StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) + StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless) + + ArchiveMessages = stats.Int64("gowaku_archive_messages", "Number of historical messages", stats.UnitDimensionless) + ArchiveErrors = stats.Int64("gowaku_archive_errors", "Number of errors in archive protocol", stats.UnitDimensionless) + ArchiveInsertDurationSeconds = stats.Int64("gowaku_archive_insert_duration_seconds", "Message insertion duration", stats.UnitSeconds) + ArchiveQueryDurationSeconds = stats.Int64("gowaku_archive_query_duration_seconds", "History query duration", stats.UnitSeconds) LightpushMessages = stats.Int64("lightpush_messages", "Number of messages sent via lightpush protocol", stats.UnitDimensionless) LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless) PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless) - DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes", stats.UnitDimensionless) - DnsDiscoveryErrors = stats.Int64("errors", "Number of errors in dns discovery", stats.UnitDimensionless) + DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes in dns discovert", stats.UnitDimensionless) + DnsDiscoveryErrors = stats.Int64("dnsdisc_errors", "Number of errors in dns discovery", stats.UnitDimensionless) + + DiscV5Errors = stats.Int64("discv5_errors", "Number of errors in discv5", stats.UnitDimensionless) ) var ( @@ -75,13 +81,6 @@ var ( Description: "The number of the store queries received", Aggregation: view.Count(), } - StoreMessagesView = &view.View{ - Name: "gowaku_store_messages", - Measure: StoredMessages, - Description: "The distribution of the store protocol messages", - Aggregation: view.LastValue(), - TagKeys: []tag.Key{KeyType}, - } StoreErrorTypesView = &view.View{ Name: "gowaku_store_errors", Measure: StoreErrors, @@ -90,6 +89,34 @@ var ( TagKeys: []tag.Key{ErrorType}, } + ArchiveMessagesView = &view.View{ + Name: "gowaku_archive_messages", + Measure: ArchiveMessages, + Description: "The distribution of the archive protocol messages", + Aggregation: view.LastValue(), + TagKeys: []tag.Key{KeyType}, + } + ArchiveErrorTypesView = &view.View{ + Name: "gowaku_archive_errors", + Measure: StoreErrors, + Description: "Number of errors in archive protocol", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } + + ArchiveInsertDurationView = &view.View{ + Name: "gowaku_archive_insert_duration_seconds", + Measure: ArchiveInsertDurationSeconds, + Description: "Message insertion duration", + Aggregation: view.Count(), + } + ArchiveQueryDurationView = &view.View{ + Name: "gowaku_archive_query_duration_seconds", + Measure: ArchiveQueryDurationSeconds, + Description: "History query duration", + Aggregation: view.Count(), + } + LegacyFilterSubscriptionsView = &view.View{ Name: "gowaku_legacy_filter_subscriptions", Measure: LegacyFilterSubscriptions, @@ -143,24 +170,23 @@ var ( Aggregation: view.Count(), TagKeys: []tag.Key{ErrorType}, } + FilterRequestDurationView = &view.View{ Name: "gowaku_filter_request_duration_seconds", Measure: FilterRequestDurationSeconds, Description: "Duration of Filter Subscribe Requests", Aggregation: view.Count(), - TagKeys: []tag.Key{ErrorType}, } FilterHandleMessageDurationView = &view.View{ Name: "gowaku_filter_handle_msessageduration_seconds", Measure: FilterHandleMessageDurationSeconds, Description: "Duration to Push Message to Filter Subscribers", Aggregation: view.Count(), - TagKeys: []tag.Key{ErrorType}, } LightpushMessagesView = &view.View{ Name: "gowaku_lightpush_messages", - Measure: StoredMessages, + Measure: LightpushMessages, Description: "The distribution of the lightpush protocol messages", Aggregation: view.LastValue(), TagKeys: []tag.Key{KeyType}, @@ -192,6 +218,13 @@ var ( Aggregation: view.Count(), TagKeys: []tag.Key{ErrorType}, } + DiscV5ErrorTypesView = &view.View{ + Name: "gowaku_discv5_errors", + Measure: DiscV5Errors, + Description: "The distribution of the discv5 protocol errors", + Aggregation: view.Count(), + TagKeys: []tag.Key{ErrorType}, + } ) func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stats.Measurement) { @@ -214,6 +247,10 @@ func RecordLegacyFilterError(ctx context.Context, tagType string) { recordWithTags(ctx, ErrorType, tagType, LegacyFilterErrors.M(1)) } +func RecordArchiveError(ctx context.Context, tagType string) { + recordWithTags(ctx, ErrorType, tagType, ArchiveErrors.M(1)) +} + func RecordFilterError(ctx context.Context, tagType string) { recordWithTags(ctx, ErrorType, tagType, FilterErrors.M(1)) } @@ -245,8 +282,12 @@ func RecordDnsDiscoveryError(ctx context.Context, tagType string) { recordWithTags(ctx, ErrorType, tagType, DnsDiscoveryErrors.M(1)) } -func RecordStoreMessage(ctx context.Context, tagType string, len int) { - if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoredMessages.M(int64(len))); err != nil { +func RecordDiscV5Error(ctx context.Context, tagType string) { + recordWithTags(ctx, ErrorType, tagType, DiscV5Errors.M(1)) +} + +func RecordArchiveMessage(ctx context.Context, tagType string, len int) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, ArchiveMessages.M(int64(len))); err != nil { utils.Logger().Error("failed to record with tags", zap.Error(err)) } } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index a1245102..a63817ca 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -110,7 +110,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -127,7 +127,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -149,7 +149,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() } @@ -252,7 +252,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -262,7 +262,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) // TODO: find out how to eliminate this sleep @@ -271,7 +271,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(2 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) time.Sleep(2 * time.Second) diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index 002d75b3..8bc11792 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -109,7 +109,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -126,7 +126,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -148,7 +148,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() } @@ -210,7 +210,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) wg.Wait() @@ -220,7 +220,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) // TODO: find out how to eliminate this sleep @@ -229,7 +229,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(3 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) require.NoError(t, err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index dc84da79..6ff6e7d2 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -87,8 +87,8 @@ func TestWakuLightPush(t *testing.T) { err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) require.NoError(t, err) - msg1 := tests.CreateWakuMessage("test1", 0) - msg2 := tests.CreateWakuMessage("test2", 1) + msg1 := tests.CreateWakuMessage("test1", utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch()) req := new(pb.PushRequest) req.Message = msg1 @@ -147,6 +147,6 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(nil, utils.Logger()) client.SetHost(clientHost) - _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) + _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index f0fa19e0..4f0203ed 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -16,11 +16,12 @@ import ( ) func TestFindLastSeenMessage(t *testing.T) { - msg1 := protocol.NewEnvelope(tests.CreateWakuMessage("1", 1), utils.GetUnixEpoch(), "test") - msg2 := protocol.NewEnvelope(tests.CreateWakuMessage("2", 2), utils.GetUnixEpoch(), "test") - msg3 := protocol.NewEnvelope(tests.CreateWakuMessage("3", 3), utils.GetUnixEpoch(), "test") - msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test") - msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test") + now := utils.GetUnixEpoch() + msg1 := protocol.NewEnvelope(tests.CreateWakuMessage("1", now+1), utils.GetUnixEpoch(), "test") + msg2 := protocol.NewEnvelope(tests.CreateWakuMessage("2", now+2), utils.GetUnixEpoch(), "test") + msg3 := protocol.NewEnvelope(tests.CreateWakuMessage("3", now+3), utils.GetUnixEpoch(), "test") + msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", now+4), utils.GetUnixEpoch(), "test") + msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", now+5), utils.GetUnixEpoch(), "test") s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(msg1) @@ -49,13 +50,14 @@ func TestResume(t *testing.T) { defer s1.Stop() + now := utils.GetUnixEpoch() for i := 0; i < 10; i++ { var contentTopic = "1" if i%2 == 0 { contentTopic = "2" } - wakuMessage := tests.CreateWakuMessage(contentTopic, int64(i+1)) + wakuMessage := tests.CreateWakuMessage(contentTopic, now+int64(i+1)) msg := protocol.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), "test") _ = s1.storeMessage(msg) } @@ -108,7 +110,7 @@ func TestResumeWithListOfPeers(t *testing.T) { defer s1.Stop() - msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0} + msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: utils.GetUnixEpoch()} _ = s1.storeMessage(protocol.NewEnvelope(msg0, utils.GetUnixEpoch(), "test")) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 180391e5..47357b98 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -223,8 +223,6 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec }, nil } - metrics.RecordStoreMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages)) - return historyResponseRPC.Response, nil } diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 728e69c1..e6a863de 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -39,8 +39,6 @@ var ( // ErrFailedQuery is emitted when the query fails to return results ErrFailedQuery = errors.New("failed to resolve the query") - - ErrFutureMessage = errors.New("message timestamp in the future") ) type WakuSwap interface { diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 1859fd1f..c6db5c39 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -22,9 +22,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" ) -// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp -const MaxTimeVariance = time.Duration(20) * time.Second - func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) { if query.PagingInfo == nil { query.PagingInfo = &pb.PagingInfo{ @@ -78,6 +75,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse type MessageProvider interface { GetAll() ([]persistence.StoredMessage, error) Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error) + Validate(env *protocol.Envelope) error Put(env *protocol.Envelope) error MostRecentTimestamp() (int64, error) Start(ctx context.Context, timesource timesource.Timesource) error @@ -129,9 +127,8 @@ func (store *WakuStore) Start(ctx context.Context) error { store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest) - store.wg.Add(2) + store.wg.Add(1) go store.storeIncomingMessages(store.ctx) - go store.updateMetrics(store.ctx) store.log.Info("Store protocol started") @@ -139,16 +136,17 @@ func (store *WakuStore) Start(ctx context.Context) error { } func (store *WakuStore) storeMessage(env *protocol.Envelope) error { - // Ensure that messages don't "jump" to the front of the queue with future timestamps - if env.Index().SenderTime-env.Index().ReceiverTime > int64(MaxTimeVariance) { - return ErrFutureMessage - } if env.Message().Ephemeral { return nil } - err := store.msgProvider.Put(env) + err := store.msgProvider.Validate(env) + if err != nil { + return err + } + + err = store.msgProvider.Put(env) if err != nil { store.log.Error("storing message", zap.Error(err)) metrics.RecordStoreError(store.ctx, "store_failure") @@ -167,26 +165,6 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) { } } -func (store *WakuStore) updateMetrics(ctx context.Context) { - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - defer store.wg.Done() - - for { - select { - case <-ticker.C: - msgCount, err := store.msgProvider.Count() - if err != nil { - store.log.Error("updating store metrics", zap.Error(err)) - } else { - metrics.RecordStoreMessage(store.ctx, "stored", msgCount) - } - case <-ctx.Done(): - return - } - } -} - func (store *WakuStore) onRequest(s network.Stream) { defer s.Close() logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer())) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 92412f6d..2c57cbdc 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -122,11 +122,12 @@ func TestWakuStoreProtocolNext(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - msg1 := tests.CreateWakuMessage(topic1, 1) - msg2 := tests.CreateWakuMessage(topic1, 2) - msg3 := tests.CreateWakuMessage(topic1, 3) - msg4 := tests.CreateWakuMessage(topic1, 4) - msg5 := tests.CreateWakuMessage(topic1, 5) + now := utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage(topic1, now+1) + msg2 := tests.CreateWakuMessage(topic1, now+2) + msg3 := tests.CreateWakuMessage(topic1, now+3) + msg4 := tests.CreateWakuMessage(topic1, now+4) + msg5 := tests.CreateWakuMessage(topic1, now+5) s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) @@ -194,11 +195,12 @@ func TestWakuStoreResult(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - msg1 := tests.CreateWakuMessage(topic1, 1) - msg2 := tests.CreateWakuMessage(topic1, 2) - msg3 := tests.CreateWakuMessage(topic1, 3) - msg4 := tests.CreateWakuMessage(topic1, 4) - msg5 := tests.CreateWakuMessage(topic1, 5) + now := utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage(topic1, now+1) + msg2 := tests.CreateWakuMessage(topic1, now+2) + msg3 := tests.CreateWakuMessage(topic1, now+3) + msg4 := tests.CreateWakuMessage(topic1, now+4) + msg5 := tests.CreateWakuMessage(topic1, now+5) s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) @@ -282,15 +284,16 @@ func TestWakuStoreProtocolFind(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - msg1 := tests.CreateWakuMessage(topic1, 1) - msg2 := tests.CreateWakuMessage(topic1, 2) - msg3 := tests.CreateWakuMessage(topic1, 3) - msg4 := tests.CreateWakuMessage(topic1, 4) - msg5 := tests.CreateWakuMessage(topic1, 5) - msg6 := tests.CreateWakuMessage(topic1, 6) - msg7 := tests.CreateWakuMessage("hello", 7) - msg8 := tests.CreateWakuMessage(topic1, 8) - msg9 := tests.CreateWakuMessage(topic1, 9) + now := utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage(topic1, now+1) + msg2 := tests.CreateWakuMessage(topic1, now+2) + msg3 := tests.CreateWakuMessage(topic1, now+3) + msg4 := tests.CreateWakuMessage(topic1, now+4) + msg5 := tests.CreateWakuMessage(topic1, now+5) + msg6 := tests.CreateWakuMessage(topic1, now+6) + msg7 := tests.CreateWakuMessage("hello", now+7) + msg8 := tests.CreateWakuMessage(topic1, now+8) + msg9 := tests.CreateWakuMessage(topic1, now+9) s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index b90db2bc..b358ed51 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -206,12 +206,13 @@ func TestTemporalHistoryQueries(t *testing.T) { s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) var messages []*wpb.WakuMessage + now := utils.GetUnixEpoch() for i := 0; i < 10; i++ { contentTopic := "1" if i%2 == 0 { contentTopic = "2" } - msg := tests.CreateWakuMessage(contentTopic, int64(i)) + msg := tests.CreateWakuMessage(contentTopic, now+int64(i)) _ = s.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), "test")) messages = append(messages, msg) } @@ -219,8 +220,8 @@ func TestTemporalHistoryQueries(t *testing.T) { // handle temporal history query with a valid time window response := s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: int64(2), - EndTime: int64(5), + StartTime: now + 2, + EndTime: now + 5, }) require.Len(t, response.Messages, 2) @@ -230,8 +231,8 @@ func TestTemporalHistoryQueries(t *testing.T) { // handle temporal history query with a zero-size time window response = s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: int64(2), - EndTime: int64(2), + StartTime: now + 2, + EndTime: now + 2, }) require.Len(t, response.Messages, 0) @@ -239,8 +240,8 @@ func TestTemporalHistoryQueries(t *testing.T) { // handle temporal history query with an invalid time window response = s.FindMessages(&pb.HistoryQuery{ ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, - StartTime: int64(5), - EndTime: int64(2), + StartTime: now + 5, + EndTime: now + 2, }) // time window is invalid since start time > end time // perhaps it should return an error? diff --git a/waku/v2/rest/relay_test.go b/waku/v2/rest/relay_test.go index cf1ab711..7339702b 100644 --- a/waku/v2/rest/relay_test.go +++ b/waku/v2/rest/relay_test.go @@ -70,16 +70,17 @@ func TestRelaySubscription(t *testing.T) { require.Equal(t, "true", rr.Body.String()) // Test max messages in subscription - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 1), 1, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 2), 2, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 3), 3, "test")) + now := utils.GetUnixEpoch() + d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test")) + d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test")) + d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test")) // Wait for the messages to be processed time.Sleep(500 * time.Millisecond) require.Len(t, d.messages["test"], 3) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 4), 4, "test")) + d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test")) time.Sleep(500 * time.Millisecond) diff --git a/waku/v2/rest/store_test.go b/waku/v2/rest/store_test.go index e9d19f77..7f9da8b1 100644 --- a/waku/v2/rest/store_test.go +++ b/waku/v2/rest/store_test.go @@ -32,9 +32,10 @@ func TestGetMessages(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - msg1 := tests.CreateWakuMessage(topic1, 1) - msg2 := tests.CreateWakuMessage(topic1, 2) - msg3 := tests.CreateWakuMessage(topic1, 3) + now := utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage(topic1, now+1) + msg2 := tests.CreateWakuMessage(topic1, now+2) + msg3 := tests.CreateWakuMessage(topic1, now+3) node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)