mirror of
https://github.com/status-im/status-go.git
synced 2025-01-18 10:42:07 +00:00
2c96475aac
Leveldb did not support topics only queries. This commit changes the behavior so that now we use the topics map if present.
241 lines
5.6 KiB
Go
241 lines
5.6 KiB
Go
package mailserver
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
waku "github.com/status-im/status-go/waku/common"
|
|
)
|
|
|
|
type LevelDB struct {
|
|
// We can't embed as there are some state problems with go-routines
|
|
ldb *leveldb.DB
|
|
name string
|
|
done chan struct{}
|
|
}
|
|
|
|
type LevelDBIterator struct {
|
|
iterator.Iterator
|
|
}
|
|
|
|
func (i *LevelDBIterator) DBKey() (*DBKey, error) {
|
|
return &DBKey{
|
|
raw: i.Key(),
|
|
}, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
|
|
rawValue := make([]byte, len(i.Value()))
|
|
copy(rawValue, i.Value())
|
|
|
|
key, err := i.DBKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !topics[key.Topic()] {
|
|
return nil, nil
|
|
}
|
|
|
|
return rawValue, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
|
|
var envelopeBloom []byte
|
|
rawValue := make([]byte, len(i.Value()))
|
|
copy(rawValue, i.Value())
|
|
|
|
key, err := i.DBKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(key.Bytes()) != DBKeyLength {
|
|
var err error
|
|
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
envelopeBloom = types.TopicToBloom(key.Topic())
|
|
}
|
|
if !types.BloomFilterMatch(bloom, envelopeBloom) {
|
|
return nil, nil
|
|
}
|
|
return rawValue, nil
|
|
}
|
|
|
|
func (i *LevelDBIterator) Release() error {
|
|
i.Iterator.Release()
|
|
return nil
|
|
}
|
|
|
|
func NewLevelDB(dataDir string) (*LevelDB, error) {
|
|
// Open opens an existing leveldb database
|
|
db, err := leveldb.OpenFile(dataDir, nil)
|
|
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
|
|
log.Info("database is corrupted trying to recover", "path", dataDir)
|
|
db, err = leveldb.RecoverFile(dataDir, nil)
|
|
}
|
|
|
|
instance := LevelDB{
|
|
ldb: db,
|
|
name: dataDir, // name is used for metrics labels
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
// initialize the metric value
|
|
instance.updateArchivedEnvelopesCount()
|
|
// checking count on every insert is inefficient
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-instance.done:
|
|
return
|
|
case <-time.After(time.Second * envelopeCountCheckInterval):
|
|
instance.updateArchivedEnvelopesCount()
|
|
}
|
|
}
|
|
}()
|
|
return &instance, err
|
|
}
|
|
|
|
// GetEnvelope get an envelope by its key
|
|
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
|
|
defer recoverLevelDBPanics("GetEnvelope")
|
|
return db.ldb.Get(key.Bytes(), nil)
|
|
}
|
|
|
|
func (db *LevelDB) updateArchivedEnvelopesCount() {
|
|
if count, err := db.envelopesCount(); err != nil {
|
|
log.Warn("db query for envelopes count failed", "err", err)
|
|
} else {
|
|
archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
|
|
}
|
|
}
|
|
|
|
// Build iterator returns an iterator given a start/end and a cursor
|
|
func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
|
|
defer recoverLevelDBPanics("BuildIterator")
|
|
|
|
i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)
|
|
// seek to the end as we want to return envelopes in a descending order
|
|
if len(query.cursor) == CursorLength {
|
|
i.Seek(query.cursor)
|
|
}
|
|
return &LevelDBIterator{i}, nil
|
|
}
|
|
|
|
// Prune removes envelopes older than time
|
|
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
|
|
defer recoverLevelDBPanics("Prune")
|
|
|
|
var zero types.Hash
|
|
var emptyTopic types.TopicType
|
|
kl := NewDBKey(0, emptyTopic, zero)
|
|
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
|
|
query := CursorQuery{
|
|
start: kl.Bytes(),
|
|
end: ku.Bytes(),
|
|
}
|
|
i, err := db.BuildIterator(query)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer func() { _ = i.Release() }()
|
|
|
|
batch := leveldb.Batch{}
|
|
removed := 0
|
|
|
|
for i.Next() {
|
|
dbKey, err := i.DBKey()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
batch.Delete(dbKey.Bytes())
|
|
|
|
if batch.Len() == batchSize {
|
|
if err := db.ldb.Write(&batch, nil); err != nil {
|
|
return removed, err
|
|
}
|
|
|
|
removed = removed + batch.Len()
|
|
batch.Reset()
|
|
}
|
|
}
|
|
|
|
if batch.Len() > 0 {
|
|
if err := db.ldb.Write(&batch, nil); err != nil {
|
|
return removed, err
|
|
}
|
|
|
|
removed = removed + batch.Len()
|
|
}
|
|
|
|
return removed, nil
|
|
}
|
|
|
|
func (db *LevelDB) envelopesCount() (int, error) {
|
|
defer recoverLevelDBPanics("envelopesCount")
|
|
iterator, err := db.BuildIterator(CursorQuery{})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// LevelDB does not have API for getting a count
|
|
var count int
|
|
for iterator.Next() {
|
|
count++
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// SaveEnvelope stores an envelope in leveldb and increments the metrics
|
|
func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
|
|
defer recoverLevelDBPanics("SaveEnvelope")
|
|
|
|
key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())
|
|
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
|
|
archivedErrorsCounter.WithLabelValues(db.name).Inc()
|
|
return err
|
|
}
|
|
|
|
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
|
|
log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
|
|
archivedErrorsCounter.WithLabelValues(db.name).Inc()
|
|
}
|
|
archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
|
|
archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
|
|
float64(waku.EnvelopeHeaderLength + env.Size()))
|
|
return err
|
|
}
|
|
|
|
func (db *LevelDB) Close() error {
|
|
select {
|
|
case <-db.done:
|
|
default:
|
|
close(db.done)
|
|
}
|
|
return db.ldb.Close()
|
|
}
|
|
|
|
func recoverLevelDBPanics(calleMethodName string) {
|
|
// Recover from possible goleveldb panics
|
|
if r := recover(); r != nil {
|
|
if errString, ok := r.(string); ok {
|
|
log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString))
|
|
}
|
|
}
|
|
}
|