From 4041f5a67ae87709d0b79aa8ae997d6023609e7b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 28 Jul 2022 15:17:12 -0400 Subject: [PATCH] fix: store recorded messages metric --- waku/persistence/store.go | 10 ++++++++++ waku/v2/protocol/store/waku_store.go | 29 +++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index a83faf62..b1f8d44b 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -334,6 +334,16 @@ func (d *DBStore) MostRecentTimestamp() (int64, error) { return result.Int64, nil } +// Count returns the number of rows in the message table +func (d *DBStore) Count() (int, error) { + var result int + err := d.db.QueryRow(`SELECT COUNT(*) FROM message`).Scan(&result) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + return result, nil +} + // GetAll returns all the stored WakuMessages func (d *DBStore) GetAll() ([]StoredMessage, error) { start := time.Now() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 749bd1ea..158b2d8e 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -112,6 +112,7 @@ type MessageProvider interface { Put(env *protocol.Envelope) error MostRecentTimestamp() (int64, error) Stop() + Count() (int, error) } type Query struct { Topic string @@ -149,6 +150,7 @@ type WakuStore struct { log *zap.Logger started bool + quit chan struct{} msgProvider MessageProvider h host.Host @@ -172,6 +174,8 @@ func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNum wakuStore.swap = swap wakuStore.wg = &sync.WaitGroup{} wakuStore.log = log.Named("store") + wakuStore.quit = make(chan struct{}) + return wakuStore } @@ -197,8 +201,9 @@ func (store *WakuStore) Start(ctx context.Context) { store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest) - store.wg.Add(1) + store.wg.Add(2) go store.storeIncomingMessages(ctx) + go store.updateMetrics(ctx) store.log.Info("Store protocol started") } @@ -226,6 +231,26 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) { } } +func (store *WakuStore) updateMetrics(ctx context.Context) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + defer store.wg.Done() + + for { + select { + case <-ticker.C: + msgCount, err := store.msgProvider.Count() + if err != nil { + store.log.Error("updating store metrics", zap.Error(err)) + } else { + metrics.RecordMessage(store.ctx, "stored", msgCount) + } + case <-store.quit: + return + } + } +} + func (store *WakuStore) onRequest(s network.Stream) { defer s.Close() logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer())) @@ -606,6 +631,8 @@ func (store *WakuStore) Stop() { close(store.MsgC) } + store.quit <- struct{}{} + if store.h != nil { store.h.RemoveStreamHandler(StoreID_v20beta4) }