[Fixes: #2328] Fixes iterator on leveldb
Leveldb did not support topics only queries. This commit changes the behavior so that now we use the topics map if present.
This commit is contained in:
parent
cd2b53643d
commit
2c96475aac
|
@ -143,7 +143,7 @@ func countMessages(t *testing.T, db DB) int {
|
|||
|
||||
for i.Next() {
|
||||
var env waku.Envelope
|
||||
value, err := i.GetEnvelope(query.bloom)
|
||||
value, err := i.GetEnvelopeByBloomFilter(query.bloom)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -749,6 +749,15 @@ func (s *mailServer) processRequestInBundles(
|
|||
"limit", limit,
|
||||
)
|
||||
|
||||
var topicsMap map[types.TopicType]bool
|
||||
|
||||
if len(topics) != 0 {
|
||||
topicsMap = make(map[types.TopicType]bool)
|
||||
for _, t := range topics {
|
||||
topicsMap[types.BytesToTopic(t)] = true
|
||||
}
|
||||
}
|
||||
|
||||
// We iterate over the envelopes.
|
||||
// We collect envelopes in batches.
|
||||
// If there still room and we haven't reached the limit
|
||||
|
@ -756,7 +765,16 @@ func (s *mailServer) processRequestInBundles(
|
|||
// Otherwise publish what you have so far, reset the bundle to the
|
||||
// current envelope, and leave if we hit the limit
|
||||
for iter.Next() {
|
||||
rawValue, err := iter.GetEnvelope(bloom)
|
||||
var rawValue []byte
|
||||
var err error
|
||||
if len(topicsMap) != 0 {
|
||||
rawValue, err = iter.GetEnvelopeByTopicsMap(topicsMap)
|
||||
|
||||
} else if len(bloom) != 0 {
|
||||
rawValue, err = iter.GetEnvelopeByBloomFilter(bloom)
|
||||
} else {
|
||||
err = errors.New("either topics or bloom must be specified")
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(
|
||||
"[mailserver:processRequestInBundles]Failed to get envelope from iterator",
|
||||
|
@ -765,6 +783,7 @@ func (s *mailServer) processRequestInBundles(
|
|||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if rawValue == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -28,7 +28,8 @@ type Iterator interface {
|
|||
DBKey() (*DBKey, error)
|
||||
Release() error
|
||||
Error() error
|
||||
GetEnvelope(bloom []byte) ([]byte, error)
|
||||
GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error)
|
||||
GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error)
|
||||
}
|
||||
|
||||
type CursorQuery struct {
|
||||
|
|
|
@ -33,7 +33,23 @@ func (i *LevelDBIterator) DBKey() (*DBKey, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (i *LevelDBIterator) GetEnvelope(bloom []byte) ([]byte, error) {
|
||||
func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
|
||||
rawValue := make([]byte, len(i.Value()))
|
||||
copy(rawValue, i.Value())
|
||||
|
||||
key, err := i.DBKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !topics[key.Topic()] {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return rawValue, nil
|
||||
}
|
||||
|
||||
func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
|
||||
var envelopeBloom []byte
|
||||
rawValue := make([]byte, len(i.Value()))
|
||||
copy(rawValue, i.Value())
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package mailserver
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
waku "github.com/status-im/status-go/waku/common"
|
||||
)
|
||||
|
||||
func TestLevelDB_BuildIteratorWithTopic(t *testing.T) {
|
||||
topic := []byte{0x01, 0x02, 0x03, 0x04}
|
||||
|
||||
dir, err := ioutil.TempDir("/tmp", "status-go-test-level-db")
|
||||
require.NoError(t, err)
|
||||
|
||||
db, err := NewLevelDB(dir)
|
||||
|
||||
defer func() {
|
||||
_ = os.Remove(dir)
|
||||
}()
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
envelope, err := newTestEnvelope(topic)
|
||||
require.NoError(t, err)
|
||||
err = db.SaveEnvelope(envelope)
|
||||
require.NoError(t, err)
|
||||
|
||||
iter, err := db.BuildIterator(CursorQuery{
|
||||
start: NewDBKey(uint32(time.Now().Add(-time.Hour).Unix()), types.BytesToTopic(topic), types.Hash{}).Bytes(),
|
||||
end: NewDBKey(uint32(time.Now().Add(time.Second).Unix()), types.BytesToTopic(topic), types.Hash{}).Bytes(),
|
||||
topics: [][]byte{topic},
|
||||
limit: 10,
|
||||
})
|
||||
topicsMap := make(map[types.TopicType]bool)
|
||||
topicsMap[types.BytesToTopic(topic)] = true
|
||||
require.NoError(t, err)
|
||||
hasNext := iter.Next()
|
||||
require.True(t, hasNext)
|
||||
rawValue, err := iter.GetEnvelopeByTopicsMap(topicsMap)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, rawValue)
|
||||
var receivedEnvelope waku.Envelope
|
||||
err = rlp.DecodeBytes(rawValue, &receivedEnvelope)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, waku.BytesToTopic(topic), receivedEnvelope.Topic)
|
||||
|
||||
err = iter.Release()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, iter.Error())
|
||||
}
|
|
@ -105,7 +105,17 @@ func (i *postgresIterator) Release() error {
|
|||
return i.Close()
|
||||
}
|
||||
|
||||
func (i *postgresIterator) GetEnvelope(bloom []byte) ([]byte, error) {
|
||||
func (i *postgresIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
|
||||
var value []byte
|
||||
var id []byte
|
||||
if err := i.Scan(&id, &value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (i *postgresIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
|
||||
var value []byte
|
||||
var id []byte
|
||||
if err := i.Scan(&id, &value); err != nil {
|
||||
|
|
|
@ -39,7 +39,7 @@ func TestPostgresDB_BuildIteratorWithBloomFilter(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
hasNext := iter.Next()
|
||||
require.True(t, hasNext)
|
||||
rawValue, err := iter.GetEnvelope(nil)
|
||||
rawValue, err := iter.GetEnvelopeByBloomFilter(nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, rawValue)
|
||||
var receivedEnvelope waku.Envelope
|
||||
|
@ -72,7 +72,7 @@ func TestPostgresDB_BuildIteratorWithTopic(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
hasNext := iter.Next()
|
||||
require.True(t, hasNext)
|
||||
rawValue, err := iter.GetEnvelope(nil)
|
||||
rawValue, err := iter.GetEnvelopeByBloomFilter(nil)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, rawValue)
|
||||
var receivedEnvelope waku.Envelope
|
||||
|
|
|
@ -38,7 +38,6 @@ func TestNewNodeConfigWithDefaults(t *testing.T) {
|
|||
assert.Equal(t, params.FleetProd, c.ClusterConfig.Fleet)
|
||||
assert.NotEmpty(t, c.ClusterConfig.BootNodes)
|
||||
assert.NotEmpty(t, c.ClusterConfig.StaticNodes)
|
||||
assert.NotEmpty(t, c.ClusterConfig.RendezvousNodes)
|
||||
assert.NotEmpty(t, c.ClusterConfig.PushNotificationsServers)
|
||||
// assert LES
|
||||
assert.Equal(t, true, c.LightEthConfig.Enabled)
|
||||
|
|
Loading…
Reference in New Issue