Add `shhext_getNewFilterMessages` function to RPC APIs.

This function returns only the new messages from the filter, never
returns the same message for the same user twice.
This commit is contained in:
Igor Mandrigin 2018-04-20 13:26:54 +02:00 committed by Igor Mandrigin
parent 92e02189c7
commit f4cd8d27b5
16 changed files with 612 additions and 33 deletions

2
Gopkg.lock generated
View File

@ -483,6 +483,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "bc797975b48cf461eb29215dfd5870a3f74a1f4cf1172bdd11372d1d693461ea"
inputs-digest = "52c415dac55089fc7b13ce6cd05f773a88a62e5e472952f13eb49c4605711f90"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -123,7 +123,7 @@ docker-test: ##@tests Run tests in a docker container with golang.
test: test-unit-coverage ##@tests Run basic, short tests during development
test-unit: ##@tests Run unit and integration tests
go test $(UNIT_TEST_PACKAGES) $(gotest_extraflags)
go test -v $(UNIT_TEST_PACKAGES) $(gotest_extraflags)
test-unit-coverage: ##@tests Run unit and integration tests with coverage
go test -coverpkg= $(UNIT_TEST_PACKAGES) $(gotest_extraflags)

View File

@ -190,3 +190,19 @@ index 880cced09..702556079 100644
func isFullNode(bloom []byte) bool {
if bloom == nil {
return true
@@ -1048,3 +1139,15 @@ func addBloom(a, b []byte) []byte {
}
return c
}
+
+// SelectedKeyPairID returns the id of currently selected key pair.
+// It helps distinguish between different users w/o exposing the user identity itself.
+func (whisper *Whisper) SelectedKeyPairID() string {
+ whisper.keyMu.RLock()
+ defer whisper.keyMu.RUnlock()
+
+ for id := range whisper.privateKeys {
+ return id
+ }
+ return ""
+}

View File

@ -1,14 +1,50 @@
package db
import (
"path/filepath"
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
)
type storagePrefix byte
const (
// PeersCache is used for the db entries used for peers DB
PeersCache storagePrefix = iota
// DeduplicatorCache is used for the db entries used for messages
// deduplication cache
DeduplicatorCache
)
// Key creates a DB key for a specified service with specified data
func Key(prefix storagePrefix, data ...[]byte) []byte {
keyLength := 1
for _, d := range data {
keyLength += len(d)
}
key := make([]byte, keyLength)
key[0] = byte(prefix)
startPos := 1
for _, d := range data {
copy(key[startPos:], d[:])
startPos += len(d)
}
return key
}
// Create returns status pointer to leveldb.DB.
func Create(path string) (*leveldb.DB, error) {
func Create(path, dbName string) (*leveldb.DB, error) {
// Create euphemeral storage if the node config path isn't provided
if path == "" {
return leveldb.Open(storage.NewMemStorage(), nil)
}
path = filepath.Join(path, dbName)
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {

37
geth/db/db_test.go Normal file
View File

@ -0,0 +1,37 @@
package db
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDBKey(t *testing.T) {
data1 := []byte{0x01, 0x02, 0x03}
data2 := []byte{0x04, 0x05, 0x06, 0x07, 0x08}
key := Key(PeersCache, data1, data2)
assert.Equal(t, len(data1)+len(data2)+1, len(key))
assert.Equal(t, byte(PeersCache), key[0])
expectedKey := append([]byte{byte(PeersCache)}, data1...)
expectedKey = append(expectedKey, data2...)
assert.Equal(t, expectedKey, key)
key = Key(DeduplicatorCache, data1)
assert.Equal(t, len(data1)+1, len(key))
assert.Equal(t, byte(DeduplicatorCache), key[0])
expectedKey = append([]byte{byte(DeduplicatorCache)}, data1...)
assert.Equal(t, expectedKey, key)
key = Key(DeduplicatorCache, data2)
assert.Equal(t, len(data2)+1, len(key))
assert.Equal(t, byte(DeduplicatorCache), key[0])
expectedKey = append([]byte{byte(DeduplicatorCache)}, data2...)
assert.Equal(t, expectedKey, key)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/peers"
"github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/services/shhext"
)
// tickerResolution is the delta to check blockchain sync progress.
@ -98,6 +99,17 @@ func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceCo
return err
}
statusDB, err := db.Create(n.config.DataDir, params.StatusDatabase)
if err != nil {
return err
}
n.db = statusDB
if err := n.setupDeduplicator(); err != nil {
return err
}
if n.config.Discovery {
return n.startPeerPool()
}
@ -105,6 +117,20 @@ func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceCo
return nil
}
func (n *StatusNode) setupDeduplicator() error {
var s shhext.Service
err := n.gethService(&s)
if err == node.ErrServiceUnknown {
return nil
}
if err != nil {
return err
}
return s.Deduplicator.Start(n.db)
}
func (n *StatusNode) createNode(config *params.NodeConfig) (err error) {
n.gethNode, err = MakeNode(config)
return
@ -143,11 +169,6 @@ func (n *StatusNode) setupRPCClient() (err error) {
}
func (n *StatusNode) startPeerPool() error {
statusDB, err := db.Create(filepath.Join(n.config.DataDir, params.StatusDatabase))
if err != nil {
return err
}
n.db = statusDB
n.register = peers.NewRegister(n.config.RegisterTopics...)
// TODO(dshulyak) consider adding a flag to define this behaviour
stopOnMax := len(n.config.RegisterTopics) == 0
@ -182,7 +203,6 @@ func (n *StatusNode) stop() error {
}
n.register = nil
n.peerPool = nil
n.db = nil
if err := n.gethNode.Stop(); err != nil {
return err
@ -195,6 +215,11 @@ func (n *StatusNode) stop() error {
n.gethNode = nil
n.config = nil
if err := n.db.Close(); err != nil {
return err
}
n.db = nil
return nil
}
@ -205,7 +230,7 @@ func (n *StatusNode) stopPeerPool() error {
n.register.Stop()
n.peerPool.Stop()
return n.db.Close()
return nil
}
// ResetChainData removes chain data if node is not running.

View File

@ -3,6 +3,7 @@ package peers
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/db"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
@ -18,12 +19,7 @@ type Cache struct {
}
func makePeerKey(peerID discv5.NodeID, topic discv5.Topic) []byte {
topicLen := len([]byte(topic))
lth := topicLen + len(peerID)
key := make([]byte, lth)
copy(key[:], topic[:])
copy(key[topicLen:], peerID[:])
return key
return db.Key(db.PeersCache, []byte(topic), peerID[:])
}
// AddPeer stores peer with a following key: <topic><peer ID>
@ -42,9 +38,7 @@ func (d *Cache) RemovePeer(peerID discv5.NodeID, topic discv5.Topic) error {
// GetPeersRange returns peers for a given topic with a limit.
func (d *Cache) GetPeersRange(topic discv5.Topic, limit int) (nodes []*discv5.Node) {
topicLen := len([]byte(topic))
key := make([]byte, topicLen)
copy(key[:], []byte(topic))
key := db.Key(db.PeersCache, []byte(topic))
iterator := d.db.NewIterator(&util.Range{Start: key}, nil)
defer iterator.Release()
count := 0

View File

@ -1,9 +1,7 @@
package peers
import (
"io/ioutil"
"net"
"os"
"testing"
"github.com/ethereum/go-ethereum/p2p/discv5"
@ -13,12 +11,7 @@ import (
)
func TestPeersRange(t *testing.T) {
path, err := ioutil.TempDir("/tmp", "status-peers-test-")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(path))
}()
rootDB, err := db.Create(path)
rootDB, err := db.Create("", "status-peers-test")
require.NoError(t, err)
defer func() {
assert.NoError(t, rootDB.Close())

View File

@ -121,6 +121,15 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (boo
return true, nil
}
// GetNewFilterMessages is a prototype method with deduplication
func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]*whisper.Message, error) {
msgs, err := api.publicAPI.GetFilterMessages(filterID)
if err != nil {
return nil, err
}
return api.service.Deduplicator.Deduplicate(msgs), err
}
// -----
// HELPER
// -----
@ -151,5 +160,4 @@ func makePayload(r MessagesRequest) []byte {
binary.BigEndian.PutUint32(data[4:], r.To)
copy(data[8:], whisper.TopicToBloom(r.Topic))
return data
}

View File

@ -0,0 +1,110 @@
package dedup
import (
"time"
"github.com/ethereum/go-ethereum/crypto/sha3"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/db"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
// cache represents a cache of whisper messages with a limit of 2 days.
// the limit is counted from the time when the message was added to the cache.
type cache struct {
db *leveldb.DB
now func() time.Time
}
func newCache(db *leveldb.DB) *cache {
return &cache{db, time.Now}
}
func (d *cache) Has(filterID string, message *whisper.Message) (bool, error) {
has, err := d.db.Has(d.keyToday(filterID, message), nil)
if err != nil {
return false, err
}
if has {
return true, nil
}
return d.db.Has(d.keyYesterday(filterID, message), nil)
}
func (d *cache) Put(filterID string, messages []*whisper.Message) error {
batch := leveldb.Batch{}
for _, msg := range messages {
batch.Put(d.keyToday(filterID, msg), []byte{})
}
err := d.db.Write(&batch, nil)
if err != nil {
return err
}
return d.cleanOldEntries()
}
func (d *cache) cleanOldEntries() error {
// Cleaning up everything that is older than 2 days
// We are using the fact that leveldb can do prefix queries and that
// the entries are sorted by keys.
// Here, we are looking for all the keys that are between
// 00000000.* and <yesterday's date>.*
// e.g. (0000000.* -> 20180424.*)
limit := d.yesterdayDateString()
r := &util.Range{
Start: db.Key(db.DeduplicatorCache, []byte("00000000")),
Limit: db.Key(db.DeduplicatorCache, []byte(limit)),
}
batch := leveldb.Batch{}
iter := d.db.NewIterator(r, nil)
for iter.Next() {
batch.Delete(iter.Key())
}
iter.Release()
return d.db.Write(&batch, nil)
}
func (d *cache) keyYesterday(filterID string, message *whisper.Message) []byte {
return prefixedKey(d.yesterdayDateString(), filterID, message)
}
func (d *cache) keyToday(filterID string, message *whisper.Message) []byte {
return prefixedKey(d.todayDateString(), filterID, message)
}
func (d *cache) todayDateString() string {
return dateString(d.now())
}
func (d *cache) yesterdayDateString() string {
now := d.now()
yesterday := now.Add(-24 * time.Hour)
return dateString(yesterday)
}
func dateString(t time.Time) string {
// Layouts must use the reference time Mon Jan 2 15:04:05 MST 2006
return t.Format("20060102")
}
func prefixedKey(date, filterID string, message *whisper.Message) []byte {
return db.Key(db.DeduplicatorCache, []byte(date), []byte(filterID), key(message))
}
func key(message *whisper.Message) []byte {
data := make([]byte, len(message.Payload)+len(message.Topic))
copy(data[:], message.Payload)
copy(data[len(message.Payload):], message.Topic[:])
digest := sha3.Sum512(data)
return digest[:]
}

View File

@ -0,0 +1,128 @@
package dedup
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
func TestDedupCacheTestSuite(t *testing.T) {
suite.Run(t, new(DedupCacheTestSuite))
}
type DedupCacheTestSuite struct {
suite.Suite
c *cache
db *leveldb.DB
}
func (s *DedupCacheTestSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
panic(err)
}
s.db = db
s.c = newCache(db)
}
func (s *DedupCacheTestSuite) TearDownTest() {
s.NoError(s.db.Close())
}
func (s *DedupCacheTestSuite) TestMultipleFilterIDs() {
filterID1 := "filter-id1"
filterID2 := "filter-id2"
filterID3 := "filter-id"
messagesFilter1 := generateMessages(10)
s.NoError(s.c.Put(filterID1, messagesFilter1))
for _, msg := range messagesFilter1 {
has, err := s.c.Has(filterID1, msg)
s.NoError(err)
s.True(has)
has, err = s.c.Has(filterID2, msg)
s.NoError(err)
s.False(has)
has, err = s.c.Has(filterID3, msg)
s.NoError(err)
s.False(has)
}
messagesFilter2 := generateMessages(10)
s.NoError(s.c.Put(filterID2, messagesFilter2))
for _, msg := range messagesFilter2 {
has, err := s.c.Has(filterID1, msg)
s.NoError(err)
s.False(has)
has, err = s.c.Has(filterID2, msg)
s.NoError(err)
s.True(has)
has, err = s.c.Has(filterID3, msg)
s.NoError(err)
s.False(has)
}
}
func (s *DedupCacheTestSuite) TestCleaningUp() {
filterID := "filter-id"
// - 2 days
s.c.now = func() time.Time { return time.Now().Add(-48 * time.Hour) }
messages2DaysOld := generateMessages(10)
s.NoError(s.c.Put(filterID, messages2DaysOld))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
// - 1 days
s.c.now = func() time.Time { return time.Now().Add(-24 * time.Hour) }
messages1DayOld := generateMessages(10)
s.NoError(s.c.Put(filterID, messages1DayOld))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
for _, msg := range messages1DayOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
// now
s.c.now = time.Now
messagesToday := generateMessages(10)
s.NoError(s.c.Put(filterID, messagesToday))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.False(has)
}
for _, msg := range messages1DayOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
for _, msg := range messagesToday {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
}

View File

@ -0,0 +1,63 @@
package dedup
import (
"github.com/ethereum/go-ethereum/log"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
)
type keyPairProvider interface {
SelectedKeyPairID() string
}
// Deduplicator filters out already received messages for a current filter.
// It keeps a limited cache of the messages.
type Deduplicator struct {
keyPairProvider keyPairProvider
cache *cache
log log.Logger
}
// NewDeduplicator creates a new deduplicator
func NewDeduplicator(keyPairProvider keyPairProvider) *Deduplicator {
return &Deduplicator{
log: log.New("package", "status-go/services/sshext.deduplicator"),
keyPairProvider: keyPairProvider,
}
}
// Start enabled deduplication.
func (d *Deduplicator) Start(db *leveldb.DB) error {
d.cache = newCache(db)
return nil
}
// Deduplicate receives a list of whisper messages and
// returns the list of the messages that weren't filtered previously for the
// specified filter.
func (d *Deduplicator) Deduplicate(messages []*whisper.Message) []*whisper.Message {
if d.cache == nil {
d.log.Info("Deduplication wasn't started. Returning all the messages.")
return messages
}
result := make([]*whisper.Message, 0)
for _, message := range messages {
if has, err := d.cache.Has(d.keyPairProvider.SelectedKeyPairID(), message); !has {
if err != nil {
d.log.Error("error while deduplicating messages: search cache failed", "err", err)
}
result = append(result, message)
}
}
// Put all the messages there, for simplicity.
// That way, we will always have repeating messages in the current day.
// Performance implications seem negligible on 30000 messages/day
err := d.cache.Put(d.keyPairProvider.SelectedKeyPairID(), messages)
if err != nil {
d.log.Error("error while deduplicating messages: cache update failed", "err", err)
}
return result
}

View File

@ -0,0 +1,128 @@
package dedup
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
type dummyKeyPairProvider struct {
id string
}
func (p dummyKeyPairProvider) SelectedKeyPairID() string {
return p.id
}
func BenchmarkDeduplicate30000MessagesADay(b *testing.B) {
// using on-disk db here for real benchmarks
dir, err := ioutil.TempDir("", "dedup-30000")
if err != nil {
panic(err)
}
defer func() {
err := os.RemoveAll(dir)
if err != nil {
panic(err)
}
}()
db, err := leveldb.OpenFile(dir, nil)
if err != nil {
panic(err)
}
d := NewDeduplicator(dummyKeyPairProvider{})
if err := d.Start(db); err != nil {
panic(err)
}
b.Log("generating messages")
messagesOld := generateMessages(100000)
b.Log("generation is done")
// pre-fill deduplicator
d.Deduplicate(messagesOld[:1000])
b.ResetTimer()
length := 300
start := 1000
for n := 0; n < b.N; n++ {
if n%100 == 0 {
d.cache.now = func() time.Time { return time.Now().Add(time.Duration(24*(n/100)) * time.Hour) }
}
if (start + length) >= len(messagesOld) {
start = 0
fmt.Println("cycle!")
}
messages := messagesOld[start:(start + length)]
start += length
d.Deduplicate(messages)
}
}
func TestDeduplicatorTestSuite(t *testing.T) {
suite.Run(t, new(DeduplicatorTestSuite))
}
type DeduplicatorTestSuite struct {
suite.Suite
d *Deduplicator
db *leveldb.DB
}
func (s *DeduplicatorTestSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
panic(err)
}
s.db = db
s.d = NewDeduplicator(dummyKeyPairProvider{})
s.NoError(s.d.Start(db))
}
func (s *DeduplicatorTestSuite) TearDownTest() {
s.NoError(s.db.Close())
}
func (s *DeduplicatorTestSuite) TestDeduplicateSingleFilter() {
s.d.keyPairProvider = dummyKeyPairProvider{"acc1"}
messages1 := generateMessages(10)
messages2 := generateMessages(12)
result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
result = s.d.Deduplicate(messages1)
s.Equal(0, len(result))
result = s.d.Deduplicate(messages2)
s.Equal(len(messages2), len(result))
messages3 := append(messages2, generateMessages(11)...)
result = s.d.Deduplicate(messages3)
s.Equal(11, len(result))
}
func (s *DeduplicatorTestSuite) TestDeduplicateMultipleFilters() {
messages1 := generateMessages(10)
s.d.keyPairProvider = dummyKeyPairProvider{"acc1"}
result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
result = s.d.Deduplicate(messages1)
s.Equal(0, len(result))
s.d.keyPairProvider = dummyKeyPairProvider{"acc2"}
result = s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
}

View File

@ -0,0 +1,26 @@
package dedup
import (
"crypto/rand"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
)
func generateMessages(count int) []*whisper.Message {
result := []*whisper.Message{}
for ; count > 0; count-- {
content := mustGenerateRandomBytes()
result = append(result, &whisper.Message{Payload: content})
}
return result
}
func mustGenerateRandomBytes() []byte {
c := 2048
b := make([]byte, c)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return b
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/services/shhext/dedup"
)
// EnvelopeState in local tracker
@ -30,9 +31,10 @@ type EnvelopeEventsHandler interface {
// Service is a service that provides some additional Whisper API.
type Service struct {
w *whisper.Whisper
tracker *tracker
nodeID *ecdsa.PrivateKey
w *whisper.Whisper
tracker *tracker
nodeID *ecdsa.PrivateKey
Deduplicator *dedup.Deduplicator
}
// Make sure that Service implements node.Service interface.
@ -46,8 +48,9 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler) *Service {
cache: map[common.Hash]EnvelopeState{},
}
return &Service{
w: w,
tracker: track,
w: w,
tracker: track,
Deduplicator: dedup.NewDeduplicator(w),
}
}

View File

@ -1177,3 +1177,15 @@ func addBloom(a, b []byte) []byte {
}
return c
}
// SelectedKeyPairID returns the id of currently selected key pair.
// It helps distinguish between different users w/o exposing the user identity itself.
func (whisper *Whisper) SelectedKeyPairID() string {
whisper.keyMu.RLock()
defer whisper.keyMu.RUnlock()
for id := range whisper.privateKeys {
return id
}
return ""
}