247 lines
5.9 KiB
Go
247 lines
5.9 KiB
Go
package mailserver
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
"github.com/status-im/status-go/common"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/logutils"
|
|
waku "github.com/status-im/status-go/waku/common"
|
|
)
|
|
|
|
type LevelDB struct {
|
|
// We can't embed as there are some state problems with go-routines
|
|
ldb *leveldb.DB
|
|
name string
|
|
done chan struct{}
|
|
}
|
|
|
|
type LevelDBIterator struct {
|
|
iterator.Iterator
|
|
}
|
|
|
|
func (i *LevelDBIterator) DBKey() (*DBKey, error) {
|
|
return &DBKey{
|
|
raw: i.Key(),
|
|
}, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
|
|
rawValue := make([]byte, len(i.Value()))
|
|
copy(rawValue, i.Value())
|
|
|
|
key, err := i.DBKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !topics[key.Topic()] {
|
|
return nil, nil
|
|
}
|
|
|
|
return rawValue, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
|
|
var envelopeBloom []byte
|
|
rawValue := make([]byte, len(i.Value()))
|
|
copy(rawValue, i.Value())
|
|
|
|
key, err := i.DBKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(key.Bytes()) != DBKeyLength {
|
|
var err error
|
|
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
envelopeBloom = types.TopicToBloom(key.Topic())
|
|
}
|
|
if !types.BloomFilterMatch(bloom, envelopeBloom) {
|
|
return nil, nil
|
|
}
|
|
return rawValue, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) Release() error {
|
|
i.Iterator.Release()
|
|
return nil
|
|
}
|
|
|
|
func NewLevelDB(dataDir string) (*LevelDB, error) {
|
|
// Open opens an existing leveldb database
|
|
db, err := leveldb.OpenFile(dataDir, nil)
|
|
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
|
|
logutils.ZapLogger().Info("database is corrupted trying to recover", zap.String("path", dataDir))
|
|
db, err = leveldb.RecoverFile(dataDir, nil)
|
|
}
|
|
|
|
instance := LevelDB{
|
|
ldb: db,
|
|
name: dataDir, // name is used for metrics labels
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
// initialize the metric value
|
|
instance.updateArchivedEnvelopesCount()
|
|
// checking count on every insert is inefficient
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
for {
|
|
select {
|
|
case <-instance.done:
|
|
return
|
|
case <-time.After(time.Second * envelopeCountCheckInterval):
|
|
instance.updateArchivedEnvelopesCount()
|
|
}
|
|
}
|
|
}()
|
|
return &instance, err
|
|
}
|
|
|
|
// GetEnvelope get an envelope by its key
|
|
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
|
|
defer recoverLevelDBPanics("GetEnvelope")
|
|
return db.ldb.Get(key.Bytes(), nil)
|
|
}
|
|
|
|
func (db *LevelDB) updateArchivedEnvelopesCount() {
|
|
if count, err := db.envelopesCount(); err != nil {
|
|
logutils.ZapLogger().Warn("db query for envelopes count failed", zap.Error(err))
|
|
} else {
|
|
archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
|
|
}
|
|
}
|
|
|
|
// Build iterator returns an iterator given a start/end and a cursor
|
|
func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
|
|
defer recoverLevelDBPanics("BuildIterator")
|
|
|
|
i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)
|
|
|
|
envelopeQueriesCounter.WithLabelValues("unknown", "unknown").Inc()
|
|
// seek to the end as we want to return envelopes in a descending order
|
|
if len(query.cursor) == CursorLength {
|
|
i.Seek(query.cursor)
|
|
}
|
|
return &LevelDBIterator{i}, nil
|
|
}
|
|
|
|
// Prune removes envelopes older than time
|
|
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
|
|
defer recoverLevelDBPanics("Prune")
|
|
|
|
var zero types.Hash
|
|
var emptyTopic types.TopicType
|
|
kl := NewDBKey(0, emptyTopic, zero)
|
|
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
|
|
query := CursorQuery{
|
|
start: kl.Bytes(),
|
|
end: ku.Bytes(),
|
|
}
|
|
i, err := db.BuildIterator(query)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer func() { _ = i.Release() }()
|
|
|
|
batch := leveldb.Batch{}
|
|
removed := 0
|
|
|
|
for i.Next() {
|
|
dbKey, err := i.DBKey()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
batch.Delete(dbKey.Bytes())
|
|
|
|
if batch.Len() == batchSize {
|
|
if err := db.ldb.Write(&batch, nil); err != nil {
|
|
return removed, err
|
|
}
|
|
|
|
removed = removed + batch.Len()
|
|
batch.Reset()
|
|
}
|
|
}
|
|
|
|
if batch.Len() > 0 {
|
|
if err := db.ldb.Write(&batch, nil); err != nil {
|
|
return removed, err
|
|
}
|
|
|
|
removed = removed + batch.Len()
|
|
}
|
|
|
|
return removed, nil
|
|
}
|
|
|
|
func (db *LevelDB) envelopesCount() (int, error) {
|
|
defer recoverLevelDBPanics("envelopesCount")
|
|
iterator, err := db.BuildIterator(CursorQuery{})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// LevelDB does not have API for getting a count
|
|
var count int
|
|
for iterator.Next() {
|
|
count++
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// SaveEnvelope stores an envelope in leveldb and increments the metrics
|
|
func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
|
|
defer recoverLevelDBPanics("SaveEnvelope")
|
|
|
|
key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())
|
|
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
|
|
if err != nil {
|
|
logutils.ZapLogger().Error("rlp.EncodeToBytes failed", zap.Error(err))
|
|
archivedErrorsCounter.WithLabelValues(db.name).Inc()
|
|
return err
|
|
}
|
|
|
|
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
|
|
logutils.ZapLogger().Error("writing to DB failed", zap.Error(err))
|
|
archivedErrorsCounter.WithLabelValues(db.name).Inc()
|
|
}
|
|
archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
|
|
archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
|
|
float64(waku.EnvelopeHeaderLength + env.Size()))
|
|
return err
|
|
}
|
|
|
|
func (db *LevelDB) Close() error {
|
|
select {
|
|
case <-db.done:
|
|
default:
|
|
close(db.done)
|
|
}
|
|
return db.ldb.Close()
|
|
}
|
|
|
|
func recoverLevelDBPanics(calleMethodName string) {
|
|
// Recover from possible goleveldb panics
|
|
if r := recover(); r != nil {
|
|
if errString, ok := r.(string); ok {
|
|
logutils.ZapLogger().Error("recovered from panic",
|
|
zap.String("calleMethodName", calleMethodName),
|
|
zap.String("errString", errString))
|
|
}
|
|
}
|
|
}
|