From 21a28e9b53bc18ff7facdca2e77d377f594f5871 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 25 Apr 2023 14:48:47 -0400 Subject: [PATCH] feat: record message size --- waku/metrics/http.go | 1 + waku/persistence/store.go | 5 +++-- waku/v2/metrics/metrics.go | 12 ++++++++++-- waku/v2/protocol/relay/waku_relay.go | 4 +++- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/waku/metrics/http.go b/waku/metrics/http.go index a8e8c95a..463d09d6 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -51,6 +51,7 @@ func NewMetricsServer(address string, port int, log *zap.Logger) *Server { // Register the views if err := view.Register( metrics.MessageView, + metrics.MessageSizeView, metrics.LegacyFilterErrorTypesView, metrics.LegacyFilterMessagesView, metrics.LegacyFilterSubscribersView, diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 840456a3..cabc2cbf 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "go.opencensus.io/stats" "go.uber.org/zap" ) @@ -289,7 +290,7 @@ func (d *DBStore) Put(env *protocol.Envelope) error { return err } ellapsed := time.Since(start) - metrics.ArchiveInsertDurationSeconds.M(int64(ellapsed.Seconds())) + stats.Record(metrics.ArchiveInsertDurationSeconds.M(int64(ellapsed.Seconds()))) err = stmt.Close() if err != nil { @@ -410,7 +411,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err return nil, nil, err } ellapsed := time.Since(measurementStart) - metrics.ArchiveQueryDurationSeconds.M(int64(ellapsed.Seconds())) + stats.Record(metrics.ArchiveQueryDurationSeconds.M(int64(ellapsed.Seconds()))) var result []StoredMessage for rows.Next() { diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 29f76129..13d98295 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -15,8 +15,10 @@ import ( var ( WakuVersion = stats.Int64("waku_version", "", stats.UnitDimensionless) Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) - Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) - Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) + MessageSize = stats.Int64("waku_histogram_message_size", "message size histogram in kB", stats.UnitDimensionless) + + Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) + Dials = stats.Int64("dials", "Number of peer dials", stats.UnitDimensionless) LegacyFilterMessages = stats.Int64("legacy_filter_messages", "Number of legacy filter messages", stats.UnitDimensionless) LegacyFilterSubscribers = stats.Int64("legacy_filter_subscribers", "Number of legacy filter subscribers", stats.UnitDimensionless) @@ -74,6 +76,12 @@ var ( Description: "The number of the messages received", Aggregation: view.Count(), } + MessageSizeView = &view.View{ + Name: "gowaku_histogram_message_size", + Measure: MessageSize, + Description: "message size histogram in kB", + Aggregation: view.Distribution(0.0, 5.0, 15.0, 50.0, 100.0, 300.0, 700.0, 1000.0), + } StoreQueriesView = &view.View{ Name: "gowaku_store_queries", diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 9233840c..a0936da5 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -384,13 +384,15 @@ func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string if msg == nil { return } - stats.Record(ctx, metrics.Messages.M(1)) wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { w.log.Error("decoding message", zap.Error(err)) return } + msgSizeInKb := len(wakuMessage.Payload) / 1000 + stats.Record(ctx, metrics.Messages.M(1), metrics.MessageSize.M(int64(msgSizeInKb))) + envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic) w.log.Debug("waku.relay received", logging.HexString("hash", envelope.Hash()))