set archived_envelopes_total on start based on DB count

Before the count was reset every time process was restarted.
Also add 'db' label to distinguish between waku and whisper DBs.

Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
Jakub Sokołowski 2020-04-07 10:21:39 +02:00 committed by Jakub
parent 42baf2251d
commit c8d20f84f8
5 changed files with 141 additions and 37 deletions

View File

@ -93,7 +93,10 @@ func setupTestServer(t *testing.T) *WhisperMailServer {
db, _ := leveldb.Open(storage.NewMemStorage(), nil) db, _ := leveldb.Open(storage.NewMemStorage(), nil)
s.ms = &mailServer{ s.ms = &mailServer{
db: &LevelDB{ldb: db}, db: &LevelDB{
ldb: db,
done: make(chan struct{}),
},
adapter: &whisperAdapter{}, adapter: &whisperAdapter{},
} }
s.minRequestPoW = powRequirement s.minRequestPoW = powRequirement

View File

@ -6,6 +6,9 @@ import (
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
) )
// every this many seconds check real envelopes count
const envelopeCountCheckInterval = 60
// DB is an interface to abstract interactions with the db so that the mailserver // DB is an interface to abstract interactions with the db so that the mailserver
// is agnostic to the underlying technology used // is agnostic to the underlying technology used
type DB interface { type DB interface {

View File

@ -18,7 +18,9 @@ import (
type LevelDB struct { type LevelDB struct {
// We can't embed as there are some state problems with go-routines // We can't embed as there are some state problems with go-routines
ldb *leveldb.DB ldb *leveldb.DB
name string
done chan struct{}
} }
type LevelDBIterator struct { type LevelDBIterator struct {
@ -68,7 +70,41 @@ func NewLevelDB(dataDir string) (*LevelDB, error) {
log.Info("database is corrupted trying to recover", "path", dataDir) log.Info("database is corrupted trying to recover", "path", dataDir)
db, err = leveldb.RecoverFile(dataDir, nil) db, err = leveldb.RecoverFile(dataDir, nil)
} }
return &LevelDB{ldb: db}, err
instance := LevelDB{
ldb: db,
name: dataDir, // name is used for metrics labels
done: make(chan struct{}),
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return &instance, err
}
// GetEnvelope get an envelope by its key
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
defer recoverLevelDBPanics("GetEnvelope")
return db.ldb.Get(key.Bytes(), nil)
}
func (db *LevelDB) updateArchivedEnvelopesCount() {
if count, err := db.envelopesCount(); err != nil {
log.Warn("db query for envelopes count failed", "err", err)
} else {
archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
}
} }
// Build iterator returns an iterator given a start/end and a cursor // Build iterator returns an iterator given a start/end and a cursor
@ -83,13 +119,6 @@ func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
return &LevelDBIterator{i}, nil return &LevelDBIterator{i}, nil
} }
// GetEnvelope get an envelope by its key
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
defer recoverLevelDBPanics("GetEnvelope")
return db.ldb.Get(key.Bytes(), nil)
}
// Prune removes envelopes older than time // Prune removes envelopes older than time
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) { func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
defer recoverLevelDBPanics("Prune") defer recoverLevelDBPanics("Prune")
@ -140,6 +169,20 @@ func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
return removed, nil return removed, nil
} }
func (db *LevelDB) envelopesCount() (int, error) {
defer recoverLevelDBPanics("envelopesCount")
iterator, err := db.BuildIterator(CursorQuery{})
if err != nil {
return 0, err
}
// LevelDB does not have API for getting a count
var count int
for iterator.Next() {
count++
}
return count, nil
}
// SaveEnvelope stores an envelope in leveldb and increments the metrics // SaveEnvelope stores an envelope in leveldb and increments the metrics
func (db *LevelDB) SaveEnvelope(env types.Envelope) error { func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
defer recoverLevelDBPanics("SaveEnvelope") defer recoverLevelDBPanics("SaveEnvelope")
@ -148,20 +191,26 @@ func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap()) rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil { if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.Inc() archivedErrorsCounter.WithLabelValues(db.name).Inc()
return err return err
} }
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil { if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
archivedErrorsCounter.Inc() archivedErrorsCounter.WithLabelValues(db.name).Inc()
} }
archivedEnvelopesCounter.Inc() archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
archivedEnvelopeSizeMeter.Observe(float64(whisper.EnvelopeHeaderLength + env.Size())) archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
float64(whisper.EnvelopeHeaderLength + env.Size()))
return err return err
} }
func (db *LevelDB) Close() error { func (db *LevelDB) Close() error {
select {
case <-db.done:
default:
close(db.done)
}
return db.ldb.Close() return db.ldb.Close()
} }

View File

@ -24,7 +24,9 @@ import (
) )
type PostgresDB struct { type PostgresDB struct {
db *sql.DB db *sql.DB
name string
done chan struct{}
} }
func NewPostgresDB(uri string) (*PostgresDB, error) { func NewPostgresDB(uri string) (*PostgresDB, error) {
@ -33,11 +35,32 @@ func NewPostgresDB(uri string) (*PostgresDB, error) {
return nil, err return nil, err
} }
instance := &PostgresDB{db: db} instance := &PostgresDB{
db: db,
done: make(chan struct{}),
}
if err := instance.setup(); err != nil { if err := instance.setup(); err != nil {
return nil, err return nil, err
} }
// name is used for metrics labels
if name, err := instance.getDBName(uri); err == nil {
instance.name = name
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return instance, nil return instance, nil
} }
@ -45,6 +68,26 @@ type postgresIterator struct {
*sql.Rows *sql.Rows
} }
func (i *PostgresDB) getDBName(uri string) (string, error) {
query := "SELECT current_database()"
var dbName string
return dbName, i.db.QueryRow(query).Scan(&dbName)
}
func (i *PostgresDB) envelopesCount() (int, error) {
query := "SELECT count(*) FROM envelopes"
var count int
return count, i.db.QueryRow(query).Scan(&count)
}
func (i *PostgresDB) updateArchivedEnvelopesCount() {
if count, err := i.envelopesCount(); err != nil {
log.Warn("db query for envelopes count failed", "err", err)
} else {
archivedEnvelopesGauge.WithLabelValues(i.name).Set(float64(count))
}
}
func (i *postgresIterator) DBKey() (*DBKey, error) { func (i *postgresIterator) DBKey() (*DBKey, error) {
var value []byte var value []byte
var id []byte var id []byte
@ -145,6 +188,11 @@ func (i *PostgresDB) setup() error {
} }
func (i *PostgresDB) Close() error { func (i *PostgresDB) Close() error {
select {
case <-i.done:
default:
close(i.done)
}
return i.db.Close() return i.db.Close()
} }
@ -196,11 +244,11 @@ func (i *PostgresDB) SaveEnvelope(env types.Envelope) error {
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap()) rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil { if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.Inc() archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err return err
} }
if rawEnvelope == nil { if rawEnvelope == nil {
archivedErrorsCounter.Inc() archivedErrorsCounter.WithLabelValues(i.name).Inc()
return errors.New("failed to encode envelope to bytes") return errors.New("failed to encode envelope to bytes")
} }
@ -220,12 +268,13 @@ func (i *PostgresDB) SaveEnvelope(env types.Envelope) error {
) )
if err != nil { if err != nil {
archivedErrorsCounter.Inc() archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err return err
} }
archivedEnvelopesCounter.Inc() archivedEnvelopesGauge.WithLabelValues(i.name).Inc()
archivedEnvelopeSizeMeter.Observe(float64(whisper.EnvelopeHeaderLength + env.Size())) archivedEnvelopeSizeMeter.WithLabelValues(i.name).Observe(
float64(whisper.EnvelopeHeaderLength + env.Size()))
return nil return nil
} }

View File

@ -43,23 +43,23 @@ var (
Help: "Size of processed Whisper envelopes in bytes.", Help: "Size of processed Whisper envelopes in bytes.",
Buckets: prom.ExponentialBuckets(1024, 4, 10), Buckets: prom.ExponentialBuckets(1024, 4, 10),
}) })
archivedErrorsCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_archived_envelopes_falures_total",
Help: "Number of failures storing a Whisper envelope.",
})
archivedEnvelopesCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_archived_envelopes_total",
Help: "Number of envelopes saved.",
})
archivedEnvelopeSizeMeter = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_archived_envelope_size_bytes",
Help: "Size of envelopes saved.",
Buckets: prom.ExponentialBuckets(1024, 2, 11),
})
mailDeliveryDuration = prom.NewHistogram(prom.HistogramOpts{ mailDeliveryDuration = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_delivery_duration_seconds", Name: "mailserver_delivery_duration_seconds",
Help: "Time it takes to deliver messages to a Whisper peer.", Help: "Time it takes to deliver messages to a Whisper peer.",
}) })
archivedErrorsCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "mailserver_archived_envelopes_falures_total",
Help: "Number of failures storing a Whisper envelope.",
}, []string{"db"})
archivedEnvelopesGauge = prom.NewGaugeVec(prom.GaugeOpts{
Name: "mailserver_archived_envelopes_total",
Help: "Number of envelopes saved in the DB.",
}, []string{"db"})
archivedEnvelopeSizeMeter = prom.NewHistogramVec(prom.HistogramOpts{
Name: "mailserver_archived_envelope_size_bytes",
Help: "Size of envelopes saved.",
Buckets: prom.ExponentialBuckets(1024, 2, 11),
}, []string{"db"})
) )
func init() { func init() {
@ -72,8 +72,8 @@ func init() {
prom.MustRegister(syncAttemptsCounter) prom.MustRegister(syncAttemptsCounter)
prom.MustRegister(sendRawEnvelopeDuration) prom.MustRegister(sendRawEnvelopeDuration)
prom.MustRegister(sentEnvelopeBatchSizeMeter) prom.MustRegister(sentEnvelopeBatchSizeMeter)
prom.MustRegister(archivedErrorsCounter)
prom.MustRegister(archivedEnvelopesCounter)
prom.MustRegister(archivedEnvelopeSizeMeter)
prom.MustRegister(mailDeliveryDuration) prom.MustRegister(mailDeliveryDuration)
prom.MustRegister(archivedErrorsCounter)
prom.MustRegister(archivedEnvelopesGauge)
prom.MustRegister(archivedEnvelopeSizeMeter)
} }