status-go/mailserver/mailserver_db_leveldb.go

247 lines
5.9 KiB
Go

package mailserver
import (
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/logutils"
waku "github.com/status-im/status-go/waku/common"
)
type LevelDB struct {
// We can't embed as there are some state problems with go-routines
ldb *leveldb.DB
name string
done chan struct{}
}
type LevelDBIterator struct {
iterator.Iterator
}
func (i *LevelDBIterator) DBKey() (*DBKey, error) {
return &DBKey{
raw: i.Key(),
}, nil
}
func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
rawValue := make([]byte, len(i.Value()))
copy(rawValue, i.Value())
key, err := i.DBKey()
if err != nil {
return nil, err
}
if !topics[key.Topic()] {
return nil, nil
}
return rawValue, nil
}
func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
var envelopeBloom []byte
rawValue := make([]byte, len(i.Value()))
copy(rawValue, i.Value())
key, err := i.DBKey()
if err != nil {
return nil, err
}
if len(key.Bytes()) != DBKeyLength {
var err error
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
if err != nil {
return nil, err
}
} else {
envelopeBloom = types.TopicToBloom(key.Topic())
}
if !types.BloomFilterMatch(bloom, envelopeBloom) {
return nil, nil
}
return rawValue, nil
}
func (i *LevelDBIterator) Release() error {
i.Iterator.Release()
return nil
}
func NewLevelDB(dataDir string) (*LevelDB, error) {
// Open opens an existing leveldb database
db, err := leveldb.OpenFile(dataDir, nil)
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
logutils.ZapLogger().Info("database is corrupted trying to recover", zap.String("path", dataDir))
db, err = leveldb.RecoverFile(dataDir, nil)
}
instance := LevelDB{
ldb: db,
name: dataDir, // name is used for metrics labels
done: make(chan struct{}),
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
defer common.LogOnPanic()
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return &instance, err
}
// GetEnvelope get an envelope by its key
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
defer recoverLevelDBPanics("GetEnvelope")
return db.ldb.Get(key.Bytes(), nil)
}
func (db *LevelDB) updateArchivedEnvelopesCount() {
if count, err := db.envelopesCount(); err != nil {
logutils.ZapLogger().Warn("db query for envelopes count failed", zap.Error(err))
} else {
archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
}
}
// Build iterator returns an iterator given a start/end and a cursor
func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
defer recoverLevelDBPanics("BuildIterator")
i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)
envelopeQueriesCounter.WithLabelValues("unknown", "unknown").Inc()
// seek to the end as we want to return envelopes in a descending order
if len(query.cursor) == CursorLength {
i.Seek(query.cursor)
}
return &LevelDBIterator{i}, nil
}
// Prune removes envelopes older than time
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
defer recoverLevelDBPanics("Prune")
var zero types.Hash
var emptyTopic types.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
query := CursorQuery{
start: kl.Bytes(),
end: ku.Bytes(),
}
i, err := db.BuildIterator(query)
if err != nil {
return 0, err
}
defer func() { _ = i.Release() }()
batch := leveldb.Batch{}
removed := 0
for i.Next() {
dbKey, err := i.DBKey()
if err != nil {
return 0, err
}
batch.Delete(dbKey.Bytes())
if batch.Len() == batchSize {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
batch.Reset()
}
}
if batch.Len() > 0 {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
}
return removed, nil
}
func (db *LevelDB) envelopesCount() (int, error) {
defer recoverLevelDBPanics("envelopesCount")
iterator, err := db.BuildIterator(CursorQuery{})
if err != nil {
return 0, err
}
// LevelDB does not have API for getting a count
var count int
for iterator.Next() {
count++
}
return count, nil
}
// SaveEnvelope stores an envelope in leveldb and increments the metrics
func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
defer recoverLevelDBPanics("SaveEnvelope")
key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil {
logutils.ZapLogger().Error("rlp.EncodeToBytes failed", zap.Error(err))
archivedErrorsCounter.WithLabelValues(db.name).Inc()
return err
}
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
logutils.ZapLogger().Error("writing to DB failed", zap.Error(err))
archivedErrorsCounter.WithLabelValues(db.name).Inc()
}
archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
float64(waku.EnvelopeHeaderLength + env.Size()))
return err
}
func (db *LevelDB) Close() error {
select {
case <-db.done:
default:
close(db.done)
}
return db.ldb.Close()
}
func recoverLevelDBPanics(calleMethodName string) {
// Recover from possible goleveldb panics
if r := recover(); r != nil {
if errString, ok := r.(string); ok {
logutils.ZapLogger().Error("recovered from panic",
zap.String("calleMethodName", calleMethodName),
zap.String("errString", errString))
}
}
}