fix: store recorded messages metric

This commit is contained in:
Richard Ramos 2022-07-28 15:17:12 -04:00
parent 1c2c68fbfd
commit 4041f5a67a
No known key found for this signature in database
GPG Key ID: BD36D48BC9FFC88C
2 changed files with 38 additions and 1 deletions

View File

@ -334,6 +334,16 @@ func (d *DBStore) MostRecentTimestamp() (int64, error) {
return result.Int64, nil
}
// Count returns the number of rows in the message table
func (d *DBStore) Count() (int, error) {
var result int
err := d.db.QueryRow(`SELECT COUNT(*) FROM message`).Scan(&result)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return result, nil
}
// GetAll returns all the stored WakuMessages
func (d *DBStore) GetAll() ([]StoredMessage, error) {
start := time.Now()

View File

@ -112,6 +112,7 @@ type MessageProvider interface {
Put(env *protocol.Envelope) error
MostRecentTimestamp() (int64, error)
Stop()
Count() (int, error)
}
type Query struct {
Topic string
@ -149,6 +150,7 @@ type WakuStore struct {
log *zap.Logger
started bool
quit chan struct{}
msgProvider MessageProvider
h host.Host
@ -172,6 +174,8 @@ func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNum
wakuStore.swap = swap
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.quit = make(chan struct{})
return wakuStore
}
@ -197,8 +201,9 @@ func (store *WakuStore) Start(ctx context.Context) {
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
store.wg.Add(1)
store.wg.Add(2)
go store.storeIncomingMessages(ctx)
go store.updateMetrics(ctx)
store.log.Info("Store protocol started")
}
@ -226,6 +231,26 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
}
}
func (store *WakuStore) updateMetrics(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
defer store.wg.Done()
for {
select {
case <-ticker.C:
msgCount, err := store.msgProvider.Count()
if err != nil {
store.log.Error("updating store metrics", zap.Error(err))
} else {
metrics.RecordMessage(store.ctx, "stored", msgCount)
}
case <-store.quit:
return
}
}
}
func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close()
logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
@ -606,6 +631,8 @@ func (store *WakuStore) Stop() {
close(store.MsgC)
}
store.quit <- struct{}{}
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta4)
}