Deduplicator: add API to confirm received messages.

This commit is contained in:
Igor Mandrigin 2018-05-02 14:14:08 +02:00 committed by Igor Mandrigin
parent 37a58a513d
commit a933885806
9 changed files with 98 additions and 69 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/timesource" "github.com/status-im/status-go/timesource"
"github.com/syndtr/goleveldb/leveldb"
) )
// Errors related to node and services creation. // Errors related to node and services creation.
@ -42,7 +43,7 @@ var (
var logger = log.New("package", "status-go/geth/node") var logger = log.New("package", "status-go/geth/node")
// MakeNode create a geth node entity // MakeNode create a geth node entity
func MakeNode(config *params.NodeConfig) (*node.Node, error) { func MakeNode(config *params.NodeConfig, db *leveldb.DB) (*node.Node, error) {
// If DataDir is empty, it means we want to create an ephemeral node // If DataDir is empty, it means we want to create an ephemeral node
// keeping data only in memory. // keeping data only in memory.
if config.DataDir != "" { if config.DataDir != "" {
@ -92,7 +93,7 @@ func MakeNode(config *params.NodeConfig) (*node.Node, error) {
} }
// start Whisper service. // start Whisper service.
if err := activateShhService(stack, config); err != nil { if err := activateShhService(stack, config, db); err != nil {
return nil, fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err) return nil, fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err)
} }
@ -188,7 +189,7 @@ func activateStatusService(stack *node.Node, config *params.NodeConfig) error {
} }
// activateShhService configures Whisper and adds it to the given node. // activateShhService configures Whisper and adds it to the given node.
func activateShhService(stack *node.Node, config *params.NodeConfig) (err error) { func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) {
if config.WhisperConfig == nil || !config.WhisperConfig.Enabled { if config.WhisperConfig == nil || !config.WhisperConfig.Enabled {
logger.Info("SHH protocol is disabled") logger.Info("SHH protocol is disabled")
return nil return nil
@ -254,7 +255,7 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) (err error)
return nil, err return nil, err
} }
svc := shhext.New(whisper, shhext.EnvelopeSignalHandler{}) svc := shhext.New(whisper, shhext.EnvelopeSignalHandler{}, db)
return svc, nil return svc, nil
}) })
} }

View File

@ -22,7 +22,6 @@ import (
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/peers" "github.com/status-im/status-go/geth/peers"
"github.com/status-im/status-go/geth/rpc" "github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
) )
@ -78,16 +77,8 @@ func (n *StatusNode) GethNode() *node.Node {
return n.gethNode return n.gethNode
} }
// Start starts current StatusNode, will fail if it's already started. func (n *StatusNode) startWithDB(config *params.NodeConfig, db *leveldb.DB, services []node.ServiceConstructor) error {
func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceConstructor) error { if err := n.createNode(config, db); err != nil {
n.mu.Lock()
defer n.mu.Unlock()
if n.isRunning() {
return ErrNodeRunning
}
if err := n.createNode(config); err != nil {
return err return err
} }
n.config = config n.config = config
@ -100,39 +91,44 @@ func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceCo
return err return err
} }
statusDB, err := db.Create(n.config.DataDir, params.StatusDatabase) n.db = db
if err != nil {
return err
}
n.db = statusDB
if err := n.setupDeduplicator(); err != nil {
return err
}
if n.config.NoDiscovery { if n.config.NoDiscovery {
return nil return nil
} }
return n.startPeerPool() return n.startPeerPool()
} }
func (n *StatusNode) setupDeduplicator() error { // Start starts current StatusNode, will fail if it's already started.
var s shhext.Service func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceConstructor) error {
n.mu.Lock()
defer n.mu.Unlock()
err := n.gethService(&s) if n.isRunning() {
if err == node.ErrServiceUnknown { return ErrNodeRunning
return nil
} }
db, err := db.Create(config.DataDir, params.StatusDatabase)
if err != nil { if err != nil {
return err return err
} }
return s.Deduplicator.Start(n.db) err = n.startWithDB(config, db, services)
if err != nil {
if dberr := db.Close(); dberr != nil {
n.log.Error("error while closing leveldb after node crash", "error", dberr)
}
n.db = nil
return err
}
return nil
} }
func (n *StatusNode) createNode(config *params.NodeConfig) (err error) { func (n *StatusNode) createNode(config *params.NodeConfig, db *leveldb.DB) (err error) {
n.gethNode, err = MakeNode(config) n.gethNode, err = MakeNode(config, n.db)
return return err
} }
// start starts current StatusNode, will fail if it's already started. // start starts current StatusNode, will fail if it's already started.
@ -214,16 +210,19 @@ func (n *StatusNode) stop() error {
n.gethNode = nil n.gethNode = nil
n.config = nil n.config = nil
if err := n.db.Close(); err != nil { if n.db != nil {
err := n.db.Close()
n.db = nil
return err return err
} }
n.db = nil
return nil return nil
} }
func (n *StatusNode) stopPeerPool() error { func (n *StatusNode) stopPeerPool() error {
if n.config.NoDiscovery { if n.config == nil || n.config.NoDiscovery {
return nil return nil
} }

View File

@ -4,9 +4,34 @@ Whisper API Extension
API API
--- ---
#### shhext_getNewFilterMessages
Accepts the same input as [`shh_getFilterMessages`](https://github.com/ethereum/wiki/wiki/JSON-RPC#shh_getFilterChanges).
##### Returns
Returns a list of whisper messages matching the specified filter. Filters out
the messages already confirmed received by [`shhext_confirmMessagesProcessed`](#shhextconfirmmessagesprocessed)
Deduplication is made using the whisper envelope content and topic only, so the
same content received in different whisper envelopes will be deduplicated.
#### shhext_confirmMessagesProcessed
Confirms whisper messages received and processed on the client side. These
messages won't appear anymore when [`shhext_getNewFilterMessages`](#shhextgetnewfiltermessages)
is called.
##### Parameters
Gets a list of whisper envelopes.
#### shhext_post #### shhext_post
Accepts same input as shh_post (see https://github.com/ethereum/wiki/wiki/JSON-RPC#shh_post) Accepts same input as [`shh_post`](https://github.com/ethereum/wiki/wiki/JSON-RPC#shh_post).
##### Returns ##### Returns

View File

@ -124,7 +124,13 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]*whisper.Message,
if err != nil { if err != nil {
return nil, err return nil, err
} }
return api.service.Deduplicator.Deduplicate(msgs), err return api.service.deduplicator.Deduplicate(msgs), err
}
// ConfirmMessagesProcessed is a method to confirm that messages was consumed by
// the client side.
func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) error {
return api.service.deduplicator.AddMessages(messages)
} }
// ----- // -----

View File

@ -19,27 +19,18 @@ type Deduplicator struct {
} }
// NewDeduplicator creates a new deduplicator // NewDeduplicator creates a new deduplicator
func NewDeduplicator(keyPairProvider keyPairProvider) *Deduplicator { func NewDeduplicator(keyPairProvider keyPairProvider, db *leveldb.DB) *Deduplicator {
return &Deduplicator{ return &Deduplicator{
log: log.New("package", "status-go/services/sshext.deduplicator"), log: log.New("package", "status-go/services/sshext.deduplicator"),
keyPairProvider: keyPairProvider, keyPairProvider: keyPairProvider,
cache: newCache(db),
} }
} }
// Start enabled deduplication.
func (d *Deduplicator) Start(db *leveldb.DB) error {
d.cache = newCache(db)
return nil
}
// Deduplicate receives a list of whisper messages and // Deduplicate receives a list of whisper messages and
// returns the list of the messages that weren't filtered previously for the // returns the list of the messages that weren't filtered previously for the
// specified filter. // specified filter.
func (d *Deduplicator) Deduplicate(messages []*whisper.Message) []*whisper.Message { func (d *Deduplicator) Deduplicate(messages []*whisper.Message) []*whisper.Message {
if d.cache == nil {
d.log.Info("Deduplication wasn't started. Returning all the messages.")
return messages
}
result := make([]*whisper.Message, 0) result := make([]*whisper.Message, 0)
for _, message := range messages { for _, message := range messages {
@ -51,13 +42,11 @@ func (d *Deduplicator) Deduplicate(messages []*whisper.Message) []*whisper.Messa
} }
} }
// Put all the messages there, for simplicity.
// That way, we will always have repeating messages in the current day.
// Performance implications seem negligible on 30000 messages/day
err := d.cache.Put(d.keyPairProvider.SelectedKeyPairID(), messages)
if err != nil {
d.log.Error("error while deduplicating messages: cache update failed", "err", err)
}
return result return result
} }
// AddMessages adds a message to the deduplicator DB, so it will be filtered
// out.
func (d *Deduplicator) AddMessages(messages []*whisper.Message) error {
return d.cache.Put(d.keyPairProvider.SelectedKeyPairID(), messages)
}

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
@ -39,10 +40,7 @@ func BenchmarkDeduplicate30000MessagesADay(b *testing.B) {
panic(err) panic(err)
} }
d := NewDeduplicator(dummyKeyPairProvider{}) d := NewDeduplicator(dummyKeyPairProvider{}, db)
if err := d.Start(db); err != nil {
panic(err)
}
b.Log("generating messages") b.Log("generating messages")
messagesOld := generateMessages(100000) messagesOld := generateMessages(100000)
@ -65,6 +63,7 @@ func BenchmarkDeduplicate30000MessagesADay(b *testing.B) {
messages := messagesOld[start:(start + length)] messages := messagesOld[start:(start + length)]
start += length start += length
d.Deduplicate(messages) d.Deduplicate(messages)
assert.NoError(b, d.AddMessages(messages))
} }
} }
@ -84,8 +83,7 @@ func (s *DeduplicatorTestSuite) SetupTest() {
panic(err) panic(err)
} }
s.db = db s.db = db
s.d = NewDeduplicator(dummyKeyPairProvider{}) s.d = NewDeduplicator(dummyKeyPairProvider{}, db)
s.NoError(s.d.Start(db))
} }
func (s *DeduplicatorTestSuite) TearDownTest() { func (s *DeduplicatorTestSuite) TearDownTest() {
@ -99,12 +97,14 @@ func (s *DeduplicatorTestSuite) TestDeduplicateSingleFilter() {
result := s.d.Deduplicate(messages1) result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result)) s.Equal(len(messages1), len(result))
s.NoError(s.d.AddMessages(messages1))
result = s.d.Deduplicate(messages1) result = s.d.Deduplicate(messages1)
s.Equal(0, len(result)) s.Equal(0, len(result))
result = s.d.Deduplicate(messages2) result = s.d.Deduplicate(messages2)
s.Equal(len(messages2), len(result)) s.Equal(len(messages2), len(result))
s.NoError(s.d.AddMessages(messages2))
messages3 := append(messages2, generateMessages(11)...) messages3 := append(messages2, generateMessages(11)...)
@ -119,6 +119,8 @@ func (s *DeduplicatorTestSuite) TestDeduplicateMultipleFilters() {
result := s.d.Deduplicate(messages1) result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result)) s.Equal(len(messages1), len(result))
s.NoError(s.d.AddMessages(messages1))
result = s.d.Deduplicate(messages1) result = s.d.Deduplicate(messages1)
s.Equal(0, len(result)) s.Equal(0, len(result))

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/services/shhext/dedup" "github.com/status-im/status-go/services/shhext/dedup"
"github.com/syndtr/goleveldb/leveldb"
) )
// EnvelopeState in local tracker // EnvelopeState in local tracker
@ -34,14 +35,14 @@ type Service struct {
w *whisper.Whisper w *whisper.Whisper
tracker *tracker tracker *tracker
nodeID *ecdsa.PrivateKey nodeID *ecdsa.PrivateKey
Deduplicator *dedup.Deduplicator deduplicator *dedup.Deduplicator
} }
// Make sure that Service implements node.Service interface. // Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil) var _ node.Service = (*Service)(nil)
// New returns a new Service. // New returns a new Service.
func New(w *whisper.Whisper, handler EnvelopeEventsHandler) *Service { func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB) *Service {
track := &tracker{ track := &tracker{
w: w, w: w,
handler: handler, handler: handler,
@ -50,7 +51,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler) *Service {
return &Service{ return &Service{
w: w, w: w,
tracker: track, tracker: track,
Deduplicator: dedup.NewDeduplicator(w), deduplicator: dedup.NewDeduplicator(w, db),
} }
} }

View File

@ -66,7 +66,7 @@ func (s *ShhExtSuite) SetupTest() {
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.whisper[i], nil return s.whisper[i], nil
})) }))
s.services[i] = New(s.whisper[i], nil) s.services[i] = New(s.whisper[i], nil, nil)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.services[i], nil return s.services[i], nil
})) }))
@ -159,7 +159,7 @@ func (s *ShhExtSuite) TestRequestMessages() {
}() }()
mock := newHandlerMock(1) mock := newHandlerMock(1)
service := New(shh, mock) service := New(shh, mock, nil)
api := NewPublicAPI(service) api := NewPublicAPI(service)
const ( const (

View File

@ -16,6 +16,8 @@ import (
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
. "github.com/status-im/status-go/t/utils" . "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
) )
const ( const (
@ -253,8 +255,12 @@ func (s *APITestSuite) TestNodeStartCrash() {
nodeConfig, err := MakeTestNodeConfig(GetNetworkID()) nodeConfig, err := MakeTestNodeConfig(GetNetworkID())
s.NoError(err) s.NoError(err)
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.NoError(err)
defer func() { s.NoError(db.Close()) }()
// start node outside the manager (on the same port), so that manager node.Start() method fails // start node outside the manager (on the same port), so that manager node.Start() method fails
outsideNode, err := node.MakeNode(nodeConfig) outsideNode, err := node.MakeNode(nodeConfig, db)
s.NoError(err) s.NoError(err)
err = outsideNode.Start() err = outsideNode.Start()
s.NoError(err) s.NoError(err)