mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-08 08:53:12 +00:00
feat: add metrics to archive protocol and discv5
This commit is contained in:
parent
f399fe00f4
commit
9ed8f16a0a
@ -61,12 +61,17 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server {
|
||||
metrics.FilterMessagesView,
|
||||
metrics.FilterRequestDurationView,
|
||||
metrics.FilterRequestsView,
|
||||
metrics.StoreErrorTypesView,
|
||||
metrics.LightpushMessagesView,
|
||||
metrics.LightpushErrorTypesView,
|
||||
metrics.DnsDiscoveryNodesView,
|
||||
metrics.DnsDiscoveryErrorTypesView,
|
||||
metrics.StoreMessagesView,
|
||||
metrics.DiscV5ErrorTypesView,
|
||||
metrics.StoreErrorTypesView,
|
||||
metrics.StoreQueriesView,
|
||||
metrics.ArchiveErrorTypesView,
|
||||
metrics.ArchiveInsertDurationView,
|
||||
metrics.ArchiveMessagesView,
|
||||
metrics.ArchiveQueryDurationView,
|
||||
metrics.StoreErrorTypesView,
|
||||
metrics.StoreQueriesView,
|
||||
metrics.PeersView,
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
@ -19,6 +20,7 @@ import (
|
||||
|
||||
type MessageProvider interface {
|
||||
GetAll() ([]StoredMessage, error)
|
||||
Validate(env *protocol.Envelope) error
|
||||
Put(env *protocol.Envelope) error
|
||||
Query(query *pb.HistoryQuery) ([]StoredMessage, error)
|
||||
MostRecentTimestamp() (int64, error)
|
||||
@ -27,10 +29,15 @@ type MessageProvider interface {
|
||||
}
|
||||
|
||||
var ErrInvalidCursor = errors.New("invalid cursor")
|
||||
var ErrFutureMessage = errors.New("message timestamp in the future")
|
||||
var ErrMessageTooOld = errors.New("message too old")
|
||||
|
||||
// WALMode for sqlite.
|
||||
const WALMode = "wal"
|
||||
|
||||
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
|
||||
const MaxTimeVariance = time.Duration(20) * time.Second
|
||||
|
||||
// DBStore is a MessageProvider that has a *sql.DB connection
|
||||
type DBStore struct {
|
||||
MessageProvider
|
||||
@ -152,18 +159,39 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
|
||||
d.cancel = cancel
|
||||
d.timesource = timesource
|
||||
|
||||
err := d.cleanOlderRecords()
|
||||
err := d.cleanOlderRecords(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.wg.Add(1)
|
||||
d.wg.Add(2)
|
||||
go d.checkForOlderRecords(ctx, 60*time.Second)
|
||||
go d.updateMetrics(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DBStore) cleanOlderRecords() error {
|
||||
func (store *DBStore) updateMetrics(ctx context.Context) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
defer store.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
msgCount, err := store.Count()
|
||||
if err != nil {
|
||||
store.log.Error("updating store metrics", zap.Error(err))
|
||||
} else {
|
||||
metrics.RecordArchiveMessage(ctx, "stored", msgCount)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
|
||||
d.log.Info("Cleaning older records...")
|
||||
|
||||
// Delete older messages
|
||||
@ -172,6 +200,7 @@ func (d *DBStore) cleanOlderRecords() error {
|
||||
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < $1`
|
||||
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration)))
|
||||
if err != nil {
|
||||
metrics.RecordArchiveError(ctx, "retpolicy_failure")
|
||||
return err
|
||||
}
|
||||
elapsed := time.Since(start)
|
||||
@ -184,6 +213,7 @@ func (d *DBStore) cleanOlderRecords() error {
|
||||
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET $1)`
|
||||
_, err := d.db.Exec(sqlStmt, d.maxMessages)
|
||||
if err != nil {
|
||||
metrics.RecordArchiveError(ctx, "retpolicy_failure")
|
||||
return err
|
||||
}
|
||||
elapsed := time.Since(start)
|
||||
@ -206,7 +236,7 @@ func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := d.cleanOlderRecords()
|
||||
err := d.cleanOlderRecords(ctx)
|
||||
if err != nil {
|
||||
d.log.Error("cleaning older records", zap.Error(err))
|
||||
}
|
||||
@ -225,19 +255,41 @@ func (d *DBStore) Stop() {
|
||||
d.db.Close()
|
||||
}
|
||||
|
||||
func (d *DBStore) Validate(env *protocol.Envelope) error {
|
||||
n := time.Unix(0, env.Index().ReceiverTime)
|
||||
upperBound := n.Add(MaxTimeVariance)
|
||||
lowerBound := n.Add(-MaxTimeVariance)
|
||||
|
||||
// Ensure that messages don't "jump" to the front of the queue with future timestamps
|
||||
if env.Message().Timestamp > upperBound.UnixNano() {
|
||||
return ErrFutureMessage
|
||||
}
|
||||
|
||||
if env.Message().Timestamp < lowerBound.UnixNano() {
|
||||
return ErrMessageTooOld
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Put inserts a WakuMessage into the DB
|
||||
func (d *DBStore) Put(env *protocol.Envelope) error {
|
||||
stmt, err := d.db.Prepare("INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES ($1, $2, $3, $4, $5, $6, $7)")
|
||||
if err != nil {
|
||||
metrics.RecordArchiveError(context.TODO(), "insert_failure")
|
||||
return err
|
||||
}
|
||||
|
||||
cursor := env.Index()
|
||||
dbKey := NewDBKey(uint64(cursor.SenderTime), uint64(cursor.ReceiverTime), env.PubsubTopic(), env.Index().Digest)
|
||||
|
||||
start := time.Now()
|
||||
_, err = stmt.Exec(dbKey.Bytes(), cursor.ReceiverTime, env.Message().Timestamp, env.Message().ContentTopic, env.PubsubTopic(), env.Message().Payload, env.Message().Version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ellapsed := time.Since(start)
|
||||
metrics.ArchiveInsertDurationSeconds.M(int64(ellapsed.Seconds()))
|
||||
|
||||
err = stmt.Close()
|
||||
if err != nil {
|
||||
@ -352,10 +404,13 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||
pageSize := query.PagingInfo.PageSize + 1
|
||||
|
||||
parameters = append(parameters, pageSize)
|
||||
measurementStart := time.Now()
|
||||
rows, err := stmt.Query(parameters...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ellapsed := time.Since(measurementStart)
|
||||
metrics.ArchiveQueryDurationSeconds.M(int64(ellapsed.Seconds()))
|
||||
|
||||
var result []StoredMessage
|
||||
for rows.Next() {
|
||||
|
||||
@ -48,7 +48,7 @@ func TestDbStore(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, res)
|
||||
|
||||
err = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test", 1), utils.GetUnixEpoch(), "test"))
|
||||
err = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test", utils.GetUnixEpoch()), utils.GetUnixEpoch(), "test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err = store.GetAll()
|
||||
@ -65,7 +65,7 @@ func TestStoreRetention(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
insertTime := time.Now()
|
||||
|
||||
//////////////////////////////////
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-70*time.Second).UnixNano()), insertTime.Add(-70*time.Second).UnixNano(), "test"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test2", insertTime.Add(-60*time.Second).UnixNano()), insertTime.Add(-60*time.Second).UnixNano(), "test"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-50*time.Second).UnixNano()), insertTime.Add(-50*time.Second).UnixNano(), "test"))
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-discover/discover"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -245,6 +246,7 @@ func evaluateNode(node *enode.Node) bool {
|
||||
_, err := utils.EnodeToPeerInfo(node)
|
||||
|
||||
if err != nil {
|
||||
metrics.RecordDiscV5Error(context.Background(), "peer_info_failure")
|
||||
utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
@ -264,6 +266,7 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
|
||||
func (d *DiscoveryV5) iterate(ctx context.Context) error {
|
||||
iterator, err := d.Iterator()
|
||||
if err != nil {
|
||||
metrics.RecordDiscV5Error(context.Background(), "iterator_failure")
|
||||
return fmt.Errorf("obtaining iterator: %w", err)
|
||||
}
|
||||
|
||||
@ -294,12 +297,14 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error {
|
||||
|
||||
_, addresses, err := utils.Multiaddress(iterator.Node())
|
||||
if err != nil {
|
||||
metrics.RecordDiscV5Error(context.Background(), "peer_info_failure")
|
||||
d.log.Error("extracting multiaddrs from enr", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...)
|
||||
if err != nil {
|
||||
metrics.RecordDiscV5Error(context.Background(), "peer_info_failure")
|
||||
d.log.Error("converting multiaddrs to addrinfos", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -30,17 +30,23 @@ var (
|
||||
FilterRequestDurationSeconds = stats.Int64("filter_request_duration_seconds", "Duration of Filter Subscribe Requests", stats.UnitSeconds)
|
||||
FilterHandleMessageDurationSeconds = stats.Int64("filter_handle_msessageduration_seconds", "Duration to Push Message to Filter Subscribers", stats.UnitSeconds)
|
||||
|
||||
StoredMessages = stats.Int64("store_messages", "Number of historical messages", stats.UnitDimensionless)
|
||||
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
|
||||
StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless)
|
||||
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
|
||||
StoreQueries = stats.Int64("store_queries", "Number of store queries", stats.UnitDimensionless)
|
||||
|
||||
ArchiveMessages = stats.Int64("gowaku_archive_messages", "Number of historical messages", stats.UnitDimensionless)
|
||||
ArchiveErrors = stats.Int64("gowaku_archive_errors", "Number of errors in archive protocol", stats.UnitDimensionless)
|
||||
ArchiveInsertDurationSeconds = stats.Int64("gowaku_archive_insert_duration_seconds", "Message insertion duration", stats.UnitSeconds)
|
||||
ArchiveQueryDurationSeconds = stats.Int64("gowaku_archive_query_duration_seconds", "History query duration", stats.UnitSeconds)
|
||||
|
||||
LightpushMessages = stats.Int64("lightpush_messages", "Number of messages sent via lightpush protocol", stats.UnitDimensionless)
|
||||
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
|
||||
|
||||
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
|
||||
|
||||
DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes", stats.UnitDimensionless)
|
||||
DnsDiscoveryErrors = stats.Int64("errors", "Number of errors in dns discovery", stats.UnitDimensionless)
|
||||
DnsDiscoveryNodes = stats.Int64("dnsdisc_nodes", "Number of discovered nodes in dns discovert", stats.UnitDimensionless)
|
||||
DnsDiscoveryErrors = stats.Int64("dnsdisc_errors", "Number of errors in dns discovery", stats.UnitDimensionless)
|
||||
|
||||
DiscV5Errors = stats.Int64("discv5_errors", "Number of errors in discv5", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
var (
|
||||
@ -75,13 +81,6 @@ var (
|
||||
Description: "The number of the store queries received",
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
StoreMessagesView = &view.View{
|
||||
Name: "gowaku_store_messages",
|
||||
Measure: StoredMessages,
|
||||
Description: "The distribution of the store protocol messages",
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{KeyType},
|
||||
}
|
||||
StoreErrorTypesView = &view.View{
|
||||
Name: "gowaku_store_errors",
|
||||
Measure: StoreErrors,
|
||||
@ -90,6 +89,34 @@ var (
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
|
||||
ArchiveMessagesView = &view.View{
|
||||
Name: "gowaku_archive_messages",
|
||||
Measure: ArchiveMessages,
|
||||
Description: "The distribution of the archive protocol messages",
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{KeyType},
|
||||
}
|
||||
ArchiveErrorTypesView = &view.View{
|
||||
Name: "gowaku_archive_errors",
|
||||
Measure: StoreErrors,
|
||||
Description: "Number of errors in archive protocol",
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
|
||||
ArchiveInsertDurationView = &view.View{
|
||||
Name: "gowaku_archive_insert_duration_seconds",
|
||||
Measure: ArchiveInsertDurationSeconds,
|
||||
Description: "Message insertion duration",
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
ArchiveQueryDurationView = &view.View{
|
||||
Name: "gowaku_archive_query_duration_seconds",
|
||||
Measure: ArchiveQueryDurationSeconds,
|
||||
Description: "History query duration",
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
LegacyFilterSubscriptionsView = &view.View{
|
||||
Name: "gowaku_legacy_filter_subscriptions",
|
||||
Measure: LegacyFilterSubscriptions,
|
||||
@ -143,24 +170,23 @@ var (
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
|
||||
FilterRequestDurationView = &view.View{
|
||||
Name: "gowaku_filter_request_duration_seconds",
|
||||
Measure: FilterRequestDurationSeconds,
|
||||
Description: "Duration of Filter Subscribe Requests",
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
FilterHandleMessageDurationView = &view.View{
|
||||
Name: "gowaku_filter_handle_msessageduration_seconds",
|
||||
Measure: FilterHandleMessageDurationSeconds,
|
||||
Description: "Duration to Push Message to Filter Subscribers",
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
|
||||
LightpushMessagesView = &view.View{
|
||||
Name: "gowaku_lightpush_messages",
|
||||
Measure: StoredMessages,
|
||||
Measure: LightpushMessages,
|
||||
Description: "The distribution of the lightpush protocol messages",
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{KeyType},
|
||||
@ -192,6 +218,13 @@ var (
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
DiscV5ErrorTypesView = &view.View{
|
||||
Name: "gowaku_discv5_errors",
|
||||
Measure: DiscV5Errors,
|
||||
Description: "The distribution of the discv5 protocol errors",
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ErrorType},
|
||||
}
|
||||
)
|
||||
|
||||
func recordWithTags(ctx context.Context, tagKey tag.Key, tagType string, ms stats.Measurement) {
|
||||
@ -214,6 +247,10 @@ func RecordLegacyFilterError(ctx context.Context, tagType string) {
|
||||
recordWithTags(ctx, ErrorType, tagType, LegacyFilterErrors.M(1))
|
||||
}
|
||||
|
||||
func RecordArchiveError(ctx context.Context, tagType string) {
|
||||
recordWithTags(ctx, ErrorType, tagType, ArchiveErrors.M(1))
|
||||
}
|
||||
|
||||
func RecordFilterError(ctx context.Context, tagType string) {
|
||||
recordWithTags(ctx, ErrorType, tagType, FilterErrors.M(1))
|
||||
}
|
||||
@ -245,8 +282,12 @@ func RecordDnsDiscoveryError(ctx context.Context, tagType string) {
|
||||
recordWithTags(ctx, ErrorType, tagType, DnsDiscoveryErrors.M(1))
|
||||
}
|
||||
|
||||
func RecordStoreMessage(ctx context.Context, tagType string, len int) {
|
||||
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoredMessages.M(int64(len))); err != nil {
|
||||
func RecordDiscV5Error(ctx context.Context, tagType string) {
|
||||
recordWithTags(ctx, ErrorType, tagType, DiscV5Errors.M(1))
|
||||
}
|
||||
|
||||
func RecordArchiveMessage(ctx context.Context, tagType string, len int) {
|
||||
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, ArchiveMessages.M(int64(len))); err != nil {
|
||||
utils.Logger().Error("failed to record with tags", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -127,7 +127,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -149,7 +149,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
}
|
||||
@ -252,7 +252,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -262,7 +262,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
// TODO: find out how to eliminate this sleep
|
||||
@ -271,7 +271,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
@ -109,7 +109,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -126,7 +126,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -148,7 +148,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
}
|
||||
@ -210,7 +210,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
}()
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
@ -220,7 +220,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
// TODO: find out how to eliminate this sleep
|
||||
@ -229,7 +229,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
@ -87,8 +87,8 @@ func TestWakuLightPush(t *testing.T) {
|
||||
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg1 := tests.CreateWakuMessage("test1", 0)
|
||||
msg2 := tests.CreateWakuMessage("test2", 1)
|
||||
msg1 := tests.CreateWakuMessage("test1", utils.GetUnixEpoch())
|
||||
msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch())
|
||||
|
||||
req := new(pb.PushRequest)
|
||||
req.Message = msg1
|
||||
@ -147,6 +147,6 @@ func TestWakuLightPushNoPeers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(nil, utils.Logger())
|
||||
client.SetHost(clientHost)
|
||||
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
|
||||
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic)
|
||||
require.Errorf(t, err, "no suitable remote peers")
|
||||
}
|
||||
|
||||
@ -16,11 +16,12 @@ import (
|
||||
)
|
||||
|
||||
func TestFindLastSeenMessage(t *testing.T) {
|
||||
msg1 := protocol.NewEnvelope(tests.CreateWakuMessage("1", 1), utils.GetUnixEpoch(), "test")
|
||||
msg2 := protocol.NewEnvelope(tests.CreateWakuMessage("2", 2), utils.GetUnixEpoch(), "test")
|
||||
msg3 := protocol.NewEnvelope(tests.CreateWakuMessage("3", 3), utils.GetUnixEpoch(), "test")
|
||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test")
|
||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test")
|
||||
now := utils.GetUnixEpoch()
|
||||
msg1 := protocol.NewEnvelope(tests.CreateWakuMessage("1", now+1), utils.GetUnixEpoch(), "test")
|
||||
msg2 := protocol.NewEnvelope(tests.CreateWakuMessage("2", now+2), utils.GetUnixEpoch(), "test")
|
||||
msg3 := protocol.NewEnvelope(tests.CreateWakuMessage("3", now+3), utils.GetUnixEpoch(), "test")
|
||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", now+4), utils.GetUnixEpoch(), "test")
|
||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", now+5), utils.GetUnixEpoch(), "test")
|
||||
|
||||
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
|
||||
_ = s.storeMessage(msg1)
|
||||
@ -49,13 +50,14 @@ func TestResume(t *testing.T) {
|
||||
|
||||
defer s1.Stop()
|
||||
|
||||
now := utils.GetUnixEpoch()
|
||||
for i := 0; i < 10; i++ {
|
||||
var contentTopic = "1"
|
||||
if i%2 == 0 {
|
||||
contentTopic = "2"
|
||||
}
|
||||
|
||||
wakuMessage := tests.CreateWakuMessage(contentTopic, int64(i+1))
|
||||
wakuMessage := tests.CreateWakuMessage(contentTopic, now+int64(i+1))
|
||||
msg := protocol.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), "test")
|
||||
_ = s1.storeMessage(msg)
|
||||
}
|
||||
@ -108,7 +110,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||
|
||||
defer s1.Stop()
|
||||
|
||||
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0}
|
||||
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: utils.GetUnixEpoch()}
|
||||
|
||||
_ = s1.storeMessage(protocol.NewEnvelope(msg0, utils.GetUnixEpoch(), "test"))
|
||||
|
||||
|
||||
@ -223,8 +223,6 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
|
||||
}, nil
|
||||
}
|
||||
|
||||
metrics.RecordStoreMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
|
||||
|
||||
return historyResponseRPC.Response, nil
|
||||
}
|
||||
|
||||
|
||||
@ -39,8 +39,6 @@ var (
|
||||
|
||||
// ErrFailedQuery is emitted when the query fails to return results
|
||||
ErrFailedQuery = errors.New("failed to resolve the query")
|
||||
|
||||
ErrFutureMessage = errors.New("message timestamp in the future")
|
||||
)
|
||||
|
||||
type WakuSwap interface {
|
||||
|
||||
@ -22,9 +22,6 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
)
|
||||
|
||||
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
|
||||
const MaxTimeVariance = time.Duration(20) * time.Second
|
||||
|
||||
func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) {
|
||||
if query.PagingInfo == nil {
|
||||
query.PagingInfo = &pb.PagingInfo{
|
||||
@ -78,6 +75,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse
|
||||
type MessageProvider interface {
|
||||
GetAll() ([]persistence.StoredMessage, error)
|
||||
Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error)
|
||||
Validate(env *protocol.Envelope) error
|
||||
Put(env *protocol.Envelope) error
|
||||
MostRecentTimestamp() (int64, error)
|
||||
Start(ctx context.Context, timesource timesource.Timesource) error
|
||||
@ -129,9 +127,8 @@ func (store *WakuStore) Start(ctx context.Context) error {
|
||||
|
||||
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
|
||||
|
||||
store.wg.Add(2)
|
||||
store.wg.Add(1)
|
||||
go store.storeIncomingMessages(store.ctx)
|
||||
go store.updateMetrics(store.ctx)
|
||||
|
||||
store.log.Info("Store protocol started")
|
||||
|
||||
@ -139,16 +136,17 @@ func (store *WakuStore) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
|
||||
// Ensure that messages don't "jump" to the front of the queue with future timestamps
|
||||
if env.Index().SenderTime-env.Index().ReceiverTime > int64(MaxTimeVariance) {
|
||||
return ErrFutureMessage
|
||||
}
|
||||
|
||||
if env.Message().Ephemeral {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := store.msgProvider.Put(env)
|
||||
err := store.msgProvider.Validate(env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = store.msgProvider.Put(env)
|
||||
if err != nil {
|
||||
store.log.Error("storing message", zap.Error(err))
|
||||
metrics.RecordStoreError(store.ctx, "store_failure")
|
||||
@ -167,26 +165,6 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (store *WakuStore) updateMetrics(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
defer store.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
msgCount, err := store.msgProvider.Count()
|
||||
if err != nil {
|
||||
store.log.Error("updating store metrics", zap.Error(err))
|
||||
} else {
|
||||
metrics.RecordStoreMessage(store.ctx, "stored", msgCount)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (store *WakuStore) onRequest(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||
|
||||
@ -122,11 +122,12 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
msg1 := tests.CreateWakuMessage(topic1, 1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, 2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, 3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, 4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, 5)
|
||||
now := utils.GetUnixEpoch()
|
||||
msg1 := tests.CreateWakuMessage(topic1, now+1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, now+2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, now+3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, now+4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, now+5)
|
||||
|
||||
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
@ -194,11 +195,12 @@ func TestWakuStoreResult(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
msg1 := tests.CreateWakuMessage(topic1, 1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, 2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, 3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, 4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, 5)
|
||||
now := utils.GetUnixEpoch()
|
||||
msg1 := tests.CreateWakuMessage(topic1, now+1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, now+2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, now+3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, now+4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, now+5)
|
||||
|
||||
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
@ -282,15 +284,16 @@ func TestWakuStoreProtocolFind(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
msg1 := tests.CreateWakuMessage(topic1, 1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, 2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, 3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, 4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, 5)
|
||||
msg6 := tests.CreateWakuMessage(topic1, 6)
|
||||
msg7 := tests.CreateWakuMessage("hello", 7)
|
||||
msg8 := tests.CreateWakuMessage(topic1, 8)
|
||||
msg9 := tests.CreateWakuMessage(topic1, 9)
|
||||
now := utils.GetUnixEpoch()
|
||||
msg1 := tests.CreateWakuMessage(topic1, now+1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, now+2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, now+3)
|
||||
msg4 := tests.CreateWakuMessage(topic1, now+4)
|
||||
msg5 := tests.CreateWakuMessage(topic1, now+5)
|
||||
msg6 := tests.CreateWakuMessage(topic1, now+6)
|
||||
msg7 := tests.CreateWakuMessage("hello", now+7)
|
||||
msg8 := tests.CreateWakuMessage(topic1, now+8)
|
||||
msg9 := tests.CreateWakuMessage(topic1, now+9)
|
||||
|
||||
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
|
||||
@ -206,12 +206,13 @@ func TestTemporalHistoryQueries(t *testing.T) {
|
||||
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
|
||||
|
||||
var messages []*wpb.WakuMessage
|
||||
now := utils.GetUnixEpoch()
|
||||
for i := 0; i < 10; i++ {
|
||||
contentTopic := "1"
|
||||
if i%2 == 0 {
|
||||
contentTopic = "2"
|
||||
}
|
||||
msg := tests.CreateWakuMessage(contentTopic, int64(i))
|
||||
msg := tests.CreateWakuMessage(contentTopic, now+int64(i))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), "test"))
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
@ -219,8 +220,8 @@ func TestTemporalHistoryQueries(t *testing.T) {
|
||||
// handle temporal history query with a valid time window
|
||||
response := s.FindMessages(&pb.HistoryQuery{
|
||||
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
|
||||
StartTime: int64(2),
|
||||
EndTime: int64(5),
|
||||
StartTime: now + 2,
|
||||
EndTime: now + 5,
|
||||
})
|
||||
|
||||
require.Len(t, response.Messages, 2)
|
||||
@ -230,8 +231,8 @@ func TestTemporalHistoryQueries(t *testing.T) {
|
||||
// handle temporal history query with a zero-size time window
|
||||
response = s.FindMessages(&pb.HistoryQuery{
|
||||
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
|
||||
StartTime: int64(2),
|
||||
EndTime: int64(2),
|
||||
StartTime: now + 2,
|
||||
EndTime: now + 2,
|
||||
})
|
||||
|
||||
require.Len(t, response.Messages, 0)
|
||||
@ -239,8 +240,8 @@ func TestTemporalHistoryQueries(t *testing.T) {
|
||||
// handle temporal history query with an invalid time window
|
||||
response = s.FindMessages(&pb.HistoryQuery{
|
||||
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
|
||||
StartTime: int64(5),
|
||||
EndTime: int64(2),
|
||||
StartTime: now + 5,
|
||||
EndTime: now + 2,
|
||||
})
|
||||
// time window is invalid since start time > end time
|
||||
// perhaps it should return an error?
|
||||
|
||||
@ -70,16 +70,17 @@ func TestRelaySubscription(t *testing.T) {
|
||||
require.Equal(t, "true", rr.Body.String())
|
||||
|
||||
// Test max messages in subscription
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 1), 1, "test"))
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 2), 2, "test"))
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 3), 3, "test"))
|
||||
now := utils.GetUnixEpoch()
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test"))
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test"))
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test"))
|
||||
|
||||
// Wait for the messages to be processed
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
require.Len(t, d.messages["test"], 3)
|
||||
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", 4), 4, "test"))
|
||||
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test"))
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
|
||||
@ -32,9 +32,10 @@ func TestGetMessages(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
msg1 := tests.CreateWakuMessage(topic1, 1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, 2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, 3)
|
||||
now := utils.GetUnixEpoch()
|
||||
msg1 := tests.CreateWakuMessage(topic1, now+1)
|
||||
msg2 := tests.CreateWakuMessage(topic1, now+2)
|
||||
msg3 := tests.CreateWakuMessage(topic1, now+3)
|
||||
|
||||
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user