diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index 1525bcd09..2e6c3c4c6 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -93,7 +93,10 @@ func setupTestServer(t *testing.T) *WhisperMailServer { db, _ := leveldb.Open(storage.NewMemStorage(), nil) s.ms = &mailServer{ - db: &LevelDB{ldb: db}, + db: &LevelDB{ + ldb: db, + done: make(chan struct{}), + }, adapter: &whisperAdapter{}, } s.minRequestPoW = powRequirement diff --git a/mailserver/mailserver_db.go b/mailserver/mailserver_db.go index e7435c841..858eb99b6 100644 --- a/mailserver/mailserver_db.go +++ b/mailserver/mailserver_db.go @@ -6,6 +6,9 @@ import ( "github.com/status-im/status-go/eth-node/types" ) +// every this many seconds check real envelopes count +const envelopeCountCheckInterval = 60 + // DB is an interface to abstract interactions with the db so that the mailserver // is agnostic to the underlying technology used type DB interface { diff --git a/mailserver/mailserver_db_leveldb.go b/mailserver/mailserver_db_leveldb.go index fe38ce557..eaace3bfc 100644 --- a/mailserver/mailserver_db_leveldb.go +++ b/mailserver/mailserver_db_leveldb.go @@ -18,7 +18,9 @@ import ( type LevelDB struct { // We can't embed as there are some state problems with go-routines - ldb *leveldb.DB + ldb *leveldb.DB + name string + done chan struct{} } type LevelDBIterator struct { @@ -68,7 +70,41 @@ func NewLevelDB(dataDir string) (*LevelDB, error) { log.Info("database is corrupted trying to recover", "path", dataDir) db, err = leveldb.RecoverFile(dataDir, nil) } - return &LevelDB{ldb: db}, err + + instance := LevelDB{ + ldb: db, + name: dataDir, // name is used for metrics labels + done: make(chan struct{}), + } + + // initialize the metric value + instance.updateArchivedEnvelopesCount() + // checking count on every insert is inefficient + go func() { + for { + select { + case <-instance.done: + return + case <-time.After(time.Second * envelopeCountCheckInterval): + instance.updateArchivedEnvelopesCount() + } + } + }() + return &instance, err +} + +// GetEnvelope get an envelope by its key +func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) { + defer recoverLevelDBPanics("GetEnvelope") + return db.ldb.Get(key.Bytes(), nil) +} + +func (db *LevelDB) updateArchivedEnvelopesCount() { + if count, err := db.envelopesCount(); err != nil { + log.Warn("db query for envelopes count failed", "err", err) + } else { + archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count)) + } } // Build iterator returns an iterator given a start/end and a cursor @@ -83,13 +119,6 @@ func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) { return &LevelDBIterator{i}, nil } -// GetEnvelope get an envelope by its key -func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) { - defer recoverLevelDBPanics("GetEnvelope") - - return db.ldb.Get(key.Bytes(), nil) -} - // Prune removes envelopes older than time func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) { defer recoverLevelDBPanics("Prune") @@ -140,6 +169,20 @@ func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) { return removed, nil } +func (db *LevelDB) envelopesCount() (int, error) { + defer recoverLevelDBPanics("envelopesCount") + iterator, err := db.BuildIterator(CursorQuery{}) + if err != nil { + return 0, err + } + // LevelDB does not have API for getting a count + var count int + for iterator.Next() { + count++ + } + return count, nil +} + // SaveEnvelope stores an envelope in leveldb and increments the metrics func (db *LevelDB) SaveEnvelope(env types.Envelope) error { defer recoverLevelDBPanics("SaveEnvelope") @@ -148,20 +191,26 @@ func (db *LevelDB) SaveEnvelope(env types.Envelope) error { rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap()) if err != nil { log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) - archivedErrorsCounter.Inc() + archivedErrorsCounter.WithLabelValues(db.name).Inc() return err } if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil { log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) - archivedErrorsCounter.Inc() + archivedErrorsCounter.WithLabelValues(db.name).Inc() } - archivedEnvelopesCounter.Inc() - archivedEnvelopeSizeMeter.Observe(float64(whisper.EnvelopeHeaderLength + env.Size())) + archivedEnvelopesGauge.WithLabelValues(db.name).Inc() + archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe( + float64(whisper.EnvelopeHeaderLength + env.Size())) return err } func (db *LevelDB) Close() error { + select { + case <-db.done: + default: + close(db.done) + } return db.ldb.Close() } diff --git a/mailserver/mailserver_db_postgres.go b/mailserver/mailserver_db_postgres.go index d4f2596a5..5892abdaf 100644 --- a/mailserver/mailserver_db_postgres.go +++ b/mailserver/mailserver_db_postgres.go @@ -24,7 +24,9 @@ import ( ) type PostgresDB struct { - db *sql.DB + db *sql.DB + name string + done chan struct{} } func NewPostgresDB(uri string) (*PostgresDB, error) { @@ -33,11 +35,32 @@ func NewPostgresDB(uri string) (*PostgresDB, error) { return nil, err } - instance := &PostgresDB{db: db} + instance := &PostgresDB{ + db: db, + done: make(chan struct{}), + } if err := instance.setup(); err != nil { return nil, err } + // name is used for metrics labels + if name, err := instance.getDBName(uri); err == nil { + instance.name = name + } + + // initialize the metric value + instance.updateArchivedEnvelopesCount() + // checking count on every insert is inefficient + go func() { + for { + select { + case <-instance.done: + return + case <-time.After(time.Second * envelopeCountCheckInterval): + instance.updateArchivedEnvelopesCount() + } + } + }() return instance, nil } @@ -45,6 +68,26 @@ type postgresIterator struct { *sql.Rows } +func (i *PostgresDB) getDBName(uri string) (string, error) { + query := "SELECT current_database()" + var dbName string + return dbName, i.db.QueryRow(query).Scan(&dbName) +} + +func (i *PostgresDB) envelopesCount() (int, error) { + query := "SELECT count(*) FROM envelopes" + var count int + return count, i.db.QueryRow(query).Scan(&count) +} + +func (i *PostgresDB) updateArchivedEnvelopesCount() { + if count, err := i.envelopesCount(); err != nil { + log.Warn("db query for envelopes count failed", "err", err) + } else { + archivedEnvelopesGauge.WithLabelValues(i.name).Set(float64(count)) + } +} + func (i *postgresIterator) DBKey() (*DBKey, error) { var value []byte var id []byte @@ -145,6 +188,11 @@ func (i *PostgresDB) setup() error { } func (i *PostgresDB) Close() error { + select { + case <-i.done: + default: + close(i.done) + } return i.db.Close() } @@ -196,11 +244,11 @@ func (i *PostgresDB) SaveEnvelope(env types.Envelope) error { rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap()) if err != nil { log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) - archivedErrorsCounter.Inc() + archivedErrorsCounter.WithLabelValues(i.name).Inc() return err } if rawEnvelope == nil { - archivedErrorsCounter.Inc() + archivedErrorsCounter.WithLabelValues(i.name).Inc() return errors.New("failed to encode envelope to bytes") } @@ -220,12 +268,13 @@ func (i *PostgresDB) SaveEnvelope(env types.Envelope) error { ) if err != nil { - archivedErrorsCounter.Inc() + archivedErrorsCounter.WithLabelValues(i.name).Inc() return err } - archivedEnvelopesCounter.Inc() - archivedEnvelopeSizeMeter.Observe(float64(whisper.EnvelopeHeaderLength + env.Size())) + archivedEnvelopesGauge.WithLabelValues(i.name).Inc() + archivedEnvelopeSizeMeter.WithLabelValues(i.name).Observe( + float64(whisper.EnvelopeHeaderLength + env.Size())) return nil } diff --git a/mailserver/metrics.go b/mailserver/metrics.go index 7571a6ab6..f574c9c1e 100644 --- a/mailserver/metrics.go +++ b/mailserver/metrics.go @@ -43,23 +43,23 @@ var ( Help: "Size of processed Whisper envelopes in bytes.", Buckets: prom.ExponentialBuckets(1024, 4, 10), }) - archivedErrorsCounter = prom.NewCounter(prom.CounterOpts{ - Name: "mailserver_archived_envelopes_falures_total", - Help: "Number of failures storing a Whisper envelope.", - }) - archivedEnvelopesCounter = prom.NewCounter(prom.CounterOpts{ - Name: "mailserver_archived_envelopes_total", - Help: "Number of envelopes saved.", - }) - archivedEnvelopeSizeMeter = prom.NewHistogram(prom.HistogramOpts{ - Name: "mailserver_archived_envelope_size_bytes", - Help: "Size of envelopes saved.", - Buckets: prom.ExponentialBuckets(1024, 2, 11), - }) mailDeliveryDuration = prom.NewHistogram(prom.HistogramOpts{ Name: "mailserver_delivery_duration_seconds", Help: "Time it takes to deliver messages to a Whisper peer.", }) + archivedErrorsCounter = prom.NewCounterVec(prom.CounterOpts{ + Name: "mailserver_archived_envelopes_falures_total", + Help: "Number of failures storing a Whisper envelope.", + }, []string{"db"}) + archivedEnvelopesGauge = prom.NewGaugeVec(prom.GaugeOpts{ + Name: "mailserver_archived_envelopes_total", + Help: "Number of envelopes saved in the DB.", + }, []string{"db"}) + archivedEnvelopeSizeMeter = prom.NewHistogramVec(prom.HistogramOpts{ + Name: "mailserver_archived_envelope_size_bytes", + Help: "Size of envelopes saved.", + Buckets: prom.ExponentialBuckets(1024, 2, 11), + }, []string{"db"}) ) func init() { @@ -72,8 +72,8 @@ func init() { prom.MustRegister(syncAttemptsCounter) prom.MustRegister(sendRawEnvelopeDuration) prom.MustRegister(sentEnvelopeBatchSizeMeter) - prom.MustRegister(archivedErrorsCounter) - prom.MustRegister(archivedEnvelopesCounter) - prom.MustRegister(archivedEnvelopeSizeMeter) prom.MustRegister(mailDeliveryDuration) + prom.MustRegister(archivedErrorsCounter) + prom.MustRegister(archivedEnvelopesGauge) + prom.MustRegister(archivedEnvelopeSizeMeter) }