remove reSelectAccount from backend, SelectKeyPair and SelectedKeyPairID methods from Whisper

This commit is contained in:
Adam Babik 2019-12-18 23:24:38 +01:00 committed by Pedro Pombeiro
parent 024f30f0b9
commit 1ac515f19e
16 changed files with 95 additions and 700 deletions

View File

@ -210,12 +210,6 @@ func TestBackendAccountsConcurrently(t *testing.T) {
wg.Done() wg.Done()
}(tuple) }(tuple)
wg.Add(1)
go func() {
assert.NoError(t, backend.reSelectAccount())
wg.Done()
}()
wg.Add(1) wg.Add(1)
go func() { go func() {
assert.NoError(t, backend.Logout()) assert.NoError(t, backend.Logout())

View File

@ -74,6 +74,7 @@ type GethStatusBackend struct {
transactor *transactions.Transactor transactor *transactions.Transactor
connectionState connectionState connectionState connectionState
appState appState appState appState
selectedAccountShhKeyID string
log log.Logger log log.Logger
allowAllRPC bool // used only for tests, disables api method restrictions allowAllRPC bool // used only for tests, disables api method restrictions
} }
@ -112,6 +113,11 @@ func (b *GethStatusBackend) Transactor() *transactions.Transactor {
return b.transactor return b.transactor
} }
// SelectedAccountShhKeyID returns a Whisper key ID of the selected chat key pair.
func (b *GethStatusBackend) SelectedAccountShhKeyID() string {
return b.selectedAccountShhKeyID
}
// IsNodeRunning confirm that node is running // IsNodeRunning confirm that node is running
func (b *GethStatusBackend) IsNodeRunning() bool { func (b *GethStatusBackend) IsNodeRunning() bool {
return b.statusNode.IsRunning() return b.statusNode.IsRunning()
@ -121,13 +127,10 @@ func (b *GethStatusBackend) IsNodeRunning() bool {
func (b *GethStatusBackend) StartNode(config *params.NodeConfig) error { func (b *GethStatusBackend) StartNode(config *params.NodeConfig) error {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if err := b.startNode(config); err != nil { if err := b.startNode(config); err != nil {
signal.SendNodeCrashed(err) signal.SendNodeCrashed(err)
return err return err
} }
return nil return nil
} }
@ -458,12 +461,6 @@ func (b *GethStatusBackend) startNode(config *params.NodeConfig) (err error) {
} }
b.log.Info("Handlers registered") b.log.Info("Handlers registered")
if err = b.reSelectAccount(); err != nil {
b.log.Error("Reselect account failed", "err", err)
return
}
b.log.Info("Account reselected")
if st, err := b.statusNode.StatusService(); err == nil { if st, err := b.statusNode.StatusService(); err == nil {
st.SetAccountManager(b.accountManager) st.SetAccountManager(b.accountManager)
} }
@ -472,6 +469,16 @@ func (b *GethStatusBackend) startNode(config *params.NodeConfig) (err error) {
st.SetDiscoverer(b.StatusNode()) st.SetDiscoverer(b.StatusNode())
} }
// Handle a case when a node is stopped and resumed.
// If there is no account selected, an error is returned.
if _, err := b.accountManager.SelectedChatAccount(); err == nil {
if err := b.injectAccountIntoServices(); err != nil {
return err
}
} else if err != account.ErrNoAccountSelected {
return err
}
signal.SendNodeReady() signal.SendNodeReady()
if err := b.statusNode.StartDiscovery(); err != nil { if err := b.statusNode.StartDiscovery(); err != nil {
@ -786,6 +793,7 @@ func (b *GethStatusBackend) cleanupServices() error {
if err := whisperService.DeleteKeyPairs(); err != nil { if err := whisperService.DeleteKeyPairs(); err != nil {
return fmt.Errorf("%s: %v", ErrWhisperClearIdentitiesFailure, err) return fmt.Errorf("%s: %v", ErrWhisperClearIdentitiesFailure, err)
} }
b.selectedAccountShhKeyID = ""
default: default:
return err return err
} }
@ -817,28 +825,6 @@ func (b *GethStatusBackend) closeAppDB() error {
return nil return nil
} }
// reSelectAccount selects previously selected account, often, after node restart.
func (b *GethStatusBackend) reSelectAccount() error {
b.AccountManager().RemoveOnboarding()
selectedChatAccount, err := b.accountManager.SelectedChatAccount()
if selectedChatAccount == nil || err == account.ErrNoAccountSelected {
return nil
}
whisperService, err := b.statusNode.WhisperService()
switch err {
case node.ErrServiceUnknown: // Whisper was never registered
case nil:
if err := whisperService.SelectKeyPair(selectedChatAccount.AccountKey.PrivateKey); err != nil {
return ErrWhisperIdentityInjectionFailure
}
default:
return err
}
return nil
}
// SelectAccount selects current wallet and chat accounts, by verifying that each address has corresponding account which can be decrypted // SelectAccount selects current wallet and chat accounts, by verifying that each address has corresponding account which can be decrypted
// using provided password. Once verification is done, the decrypted chat key is injected into Whisper (as a single identity, // using provided password. Once verification is done, the decrypted chat key is injected into Whisper (as a single identity,
// all previous identities are removed). // all previous identities are removed).
@ -853,7 +839,15 @@ func (b *GethStatusBackend) SelectAccount(loginParams account.LoginParams) error
return err return err
} }
return b.injectAccountIntoServices() if err := b.injectAccountIntoServices(); err != nil {
return err
}
if err := b.startWallet(); err != nil {
return err
}
return nil
} }
func (b *GethStatusBackend) injectAccountIntoServices() error { func (b *GethStatusBackend) injectAccountIntoServices() error {
@ -862,11 +856,17 @@ func (b *GethStatusBackend) injectAccountIntoServices() error {
return err return err
} }
identity := chatAccount.AccountKey.PrivateKey
whisperService, err := b.statusNode.WhisperService() whisperService, err := b.statusNode.WhisperService()
switch err { switch err {
case node.ErrServiceUnknown: // Whisper was never registered case node.ErrServiceUnknown: // Whisper was never registered
case nil: case nil:
if err := whisperService.SelectKeyPair(chatAccount.AccountKey.PrivateKey); err != nil { if err := whisperService.DeleteKeyPairs(); err != nil { // err is not possible; method return value is incorrect
return err
}
b.selectedAccountShhKeyID, err = whisperService.AddKeyPair(identity)
if err != nil {
return ErrWhisperIdentityInjectionFailure return ErrWhisperIdentityInjectionFailure
} }
default: default:
@ -879,11 +879,11 @@ func (b *GethStatusBackend) injectAccountIntoServices() error {
return err return err
} }
if err := st.InitProtocol(b.appDB); err != nil { if err := st.InitProtocol(identity, b.appDB); err != nil {
return err return err
} }
} }
return b.startWallet() return nil
} }
func (b *GethStatusBackend) startWallet() error { func (b *GethStatusBackend) startWallet() error {
@ -910,11 +910,13 @@ func (b *GethStatusBackend) startWallet() error {
return wallet.StartReactor( return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(), b.statusNode.RPCClient().Ethclient(),
allAddresses, allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID)) new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
)
} }
// InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper. // InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper.
func (b *GethStatusBackend) InjectChatAccount(chatKeyHex, encryptionKeyHex string) error { // TODO: change the interface and omit the last argument.
func (b *GethStatusBackend) InjectChatAccount(chatKeyHex, _ string) error {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
@ -924,36 +926,9 @@ func (b *GethStatusBackend) InjectChatAccount(chatKeyHex, encryptionKeyHex strin
if err != nil { if err != nil {
return err return err
} }
b.accountManager.SetChatAccount(chatKey) b.accountManager.SetChatAccount(chatKey)
chatAccount, err := b.accountManager.SelectedChatAccount()
if err != nil {
return err
}
whisperService, err := b.statusNode.WhisperService() return b.injectAccountIntoServices()
switch err {
case node.ErrServiceUnknown: // Whisper was never registered
case nil:
if err := whisperService.SelectKeyPair(chatAccount.AccountKey.PrivateKey); err != nil {
return ErrWhisperIdentityInjectionFailure
}
default:
return err
}
if whisperService != nil {
st, err := b.statusNode.ShhExtService()
if err != nil {
return err
}
if err := st.InitProtocol(b.appDB); err != nil {
return err
}
}
return nil
} }
func appendIf(condition bool, services []gethnode.ServiceConstructor, service gethnode.ServiceConstructor) []gethnode.ServiceConstructor { func appendIf(condition bool, services []gethnode.ServiceConstructor, service gethnode.ServiceConstructor) []gethnode.ServiceConstructor {

View File

@ -66,12 +66,6 @@ func (w *gethWhisperWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.En
return NewGethSubscriptionWrapper(w.whisper.SubscribeEnvelopeEvents(events)) return NewGethSubscriptionWrapper(w.whisper.SubscribeEnvelopeEvents(events))
} }
// SelectedKeyPairID returns the id of currently selected key pair.
// It helps distinguish between different users w/o exposing the user identity itself.
func (w *gethWhisperWrapper) SelectedKeyPairID() string {
return w.whisper.SelectedKeyPairID()
}
func (w *gethWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { func (w *gethWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
return w.whisper.GetPrivateKey(id) return w.whisper.GetPrivateKey(id)
} }

View File

@ -91,12 +91,6 @@ func (w *nimbusWhisperWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.
panic("not implemented") panic("not implemented")
} }
// SelectedKeyPairID returns the id of currently selected key pair.
// It helps distinguish between different users w/o exposing the user identity itself.
func (w *nimbusWhisperWrapper) SelectedKeyPairID() string {
return ""
}
func (w *nimbusWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { func (w *nimbusWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
retVal := w.routineQueue.Send(func(c chan<- callReturn) { retVal := w.routineQueue.Send(func(c chan<- callReturn) {
idC, err := decodeHexID(id) idC, err := decodeHexID(id)

View File

@ -38,9 +38,6 @@ type Whisper interface {
// GetCurrentTime returns current time. // GetCurrentTime returns current time.
GetCurrentTime() time.Time GetCurrentTime() time.Time
// SelectedKeyPairID returns the id of currently selected key pair.
// It helps distinguish between different users w/o exposing the user identity itself.
SelectedKeyPairID() string
// GetPrivateKey retrieves the private key of the specified identity. // GetPrivateKey retrieves the private key of the specified identity.
GetPrivateKey(id string) (*ecdsa.PrivateKey, error) GetPrivateKey(id string) (*ecdsa.PrivateKey, error)

View File

@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/services/shhext/dedup"
"github.com/status-im/status-go/db" "github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver" "github.com/status-im/status-go/mailserver"
@ -403,24 +402,31 @@ func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (
} }
} }
type Author struct {
PublicKey types.HexBytes `json:"publicKey"`
Alias string `json:"alias"`
Identicon string `json:"identicon"`
}
type Metadata struct {
DedupID []byte `json:"dedupId"`
EncryptionID types.HexBytes `json:"encryptionId"`
MessageID types.HexBytes `json:"messageId"`
Author Author `json:"author"`
}
// ConfirmMessagesProcessedByID is a method to confirm that messages was consumed by // ConfirmMessagesProcessedByID is a method to confirm that messages was consumed by
// the client side. // the client side.
// TODO: this is broken now as it requires dedup ID while a message hash should be used. // TODO: this is broken now as it requires dedup ID while a message hash should be used.
func (api *PublicAPI) ConfirmMessagesProcessedByID(messageConfirmations []*dedup.Metadata) error { func (api *PublicAPI) ConfirmMessagesProcessedByID(messageConfirmations []*Metadata) error {
confirmationCount := len(messageConfirmations) confirmationCount := len(messageConfirmations)
dedupIDs := make([][]byte, confirmationCount) dedupIDs := make([][]byte, confirmationCount)
encryptionIDs := make([][]byte, confirmationCount) encryptionIDs := make([][]byte, confirmationCount)
for i, confirmation := range messageConfirmations { for i, confirmation := range messageConfirmations {
dedupIDs[i] = confirmation.DedupID dedupIDs[i] = confirmation.DedupID
encryptionIDs[i] = confirmation.EncryptionID encryptionIDs[i] = confirmation.EncryptionID
} }
return api.service.ConfirmMessagesProcessed(encryptionIDs)
if err := api.service.ConfirmMessagesProcessed(encryptionIDs); err != nil {
return err
}
return api.service.deduplicator.AddMessageByID(dedupIDs)
} }
// Post is used to send one-to-one for those who did not enabled device-to-device sync, // Post is used to send one-to-one for those who did not enabled device-to-device sync,

View File

@ -1,125 +0,0 @@
package dedup
import (
"time"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/eth-node/types"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"golang.org/x/crypto/sha3"
)
// cache represents a cache of whisper messages with a limit of 2 days.
// the limit is counted from the time when the message was added to the cache.
type cache struct {
db *leveldb.DB
now func() time.Time
}
func newCache(db *leveldb.DB) *cache {
return &cache{db, time.Now}
}
func (d *cache) Has(filterID string, message *types.Message) (bool, error) {
has, err := d.db.Has(d.KeyToday(filterID, message), nil)
if err != nil {
return false, err
}
if has {
return true, nil
}
return d.db.Has(d.keyYesterday(filterID, message), nil)
}
func (d *cache) Put(filterID string, messages []*types.Message) error {
batch := leveldb.Batch{}
for _, msg := range messages {
batch.Put(d.KeyToday(filterID, msg), []byte{})
}
err := d.db.Write(&batch, nil)
if err != nil {
return err
}
return d.cleanOldEntries()
}
func (d *cache) PutIDs(messageIDs [][]byte) error {
batch := leveldb.Batch{}
for _, id := range messageIDs {
batch.Put(id, []byte{})
}
err := d.db.Write(&batch, nil)
if err != nil {
return err
}
return d.cleanOldEntries()
}
func (d *cache) cleanOldEntries() error {
// Cleaning up everything that is older than 2 days
// We are using the fact that leveldb can do prefix queries and that
// the entries are sorted by keys.
// Here, we are looking for all the keys that are between
// 00000000.* and <yesterday's date>.*
// e.g. (0000000.* -> 20180424.*)
limit := d.yesterdayDateString()
r := &util.Range{
Start: db.Key(db.DeduplicatorCache, []byte("00000000")),
Limit: db.Key(db.DeduplicatorCache, []byte(limit)),
}
batch := leveldb.Batch{}
iter := d.db.NewIterator(r, nil)
for iter.Next() {
batch.Delete(iter.Key())
}
iter.Release()
return d.db.Write(&batch, nil)
}
func (d *cache) keyYesterday(filterID string, message *types.Message) []byte {
return prefixedKey(d.yesterdayDateString(), filterID, message)
}
func (d *cache) KeyToday(filterID string, message *types.Message) []byte {
return prefixedKey(d.todayDateString(), filterID, message)
}
func (d *cache) todayDateString() string {
return dateString(d.now())
}
func (d *cache) yesterdayDateString() string {
now := d.now()
yesterday := now.Add(-24 * time.Hour)
return dateString(yesterday)
}
func dateString(t time.Time) string {
// Layouts must use the reference time Mon Jan 2 15:04:05 MST 2006
return t.Format("20060102")
}
func prefixedKey(date, filterID string, message *types.Message) []byte {
return db.Key(db.DeduplicatorCache, []byte(date), []byte(filterID), key(message))
}
func key(message *types.Message) []byte {
data := make([]byte, len(message.Payload)+len(message.Topic))
copy(data[:], message.Payload)
copy(data[len(message.Payload):], message.Topic[:])
digest := sha3.Sum512(data)
return digest[:]
}

View File

@ -1,130 +0,0 @@
package dedup
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
func TestDedupCacheTestSuite(t *testing.T) {
suite.Run(t, new(DedupCacheTestSuite))
}
type DedupCacheTestSuite struct {
suite.Suite
c *cache
db *leveldb.DB
}
func (s *DedupCacheTestSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
panic(err)
}
s.db = db
s.c = newCache(db)
}
func (s *DedupCacheTestSuite) TearDownTest() {
s.NoError(s.db.Close())
}
func (s *DedupCacheTestSuite) TestMultipleFilterIDs() {
const (
filterID1 = "filter-id1"
filterID2 = "filter-id2"
filterID3 = "filter-id"
)
messagesFilter1 := generateMessages(10)
s.NoError(s.c.Put(filterID1, messagesFilter1))
for _, msg := range messagesFilter1 {
has, err := s.c.Has(filterID1, msg)
s.NoError(err)
s.True(has)
has, err = s.c.Has(filterID2, msg)
s.NoError(err)
s.False(has)
has, err = s.c.Has(filterID3, msg)
s.NoError(err)
s.False(has)
}
messagesFilter2 := generateMessages(10)
s.NoError(s.c.Put(filterID2, messagesFilter2))
for _, msg := range messagesFilter2 {
has, err := s.c.Has(filterID1, msg)
s.NoError(err)
s.False(has)
has, err = s.c.Has(filterID2, msg)
s.NoError(err)
s.True(has)
has, err = s.c.Has(filterID3, msg)
s.NoError(err)
s.False(has)
}
}
func (s *DedupCacheTestSuite) TestCleaningUp() {
const filterID = "filter1-id"
// - 2 days
s.c.now = func() time.Time { return time.Now().Add(-48 * time.Hour) }
messages2DaysOld := generateMessages(10)
s.NoError(s.c.Put(filterID, messages2DaysOld))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
// - 1 days
s.c.now = func() time.Time { return time.Now().Add(-24 * time.Hour) }
messages1DayOld := generateMessages(10)
s.NoError(s.c.Put(filterID, messages1DayOld))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
for _, msg := range messages1DayOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
// now
s.c.now = time.Now
messagesToday := generateMessages(10)
s.NoError(s.c.Put(filterID, messagesToday))
for _, msg := range messages2DaysOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.False(has)
}
for _, msg := range messages1DayOld {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
for _, msg := range messagesToday {
has, err := s.c.Has(filterID, msg)
s.NoError(err)
s.True(has)
}
}

View File

@ -1,84 +0,0 @@
package dedup
import (
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/eth-node/types"
v1 "github.com/status-im/status-go/protocol/v1"
"github.com/syndtr/goleveldb/leveldb"
)
type keyPairProvider interface {
SelectedKeyPairID() string
}
// Deduplicator filters out already received messages for a current filter.
// It keeps a limited cache of the messages.
type Deduplicator struct {
keyPairProvider keyPairProvider
cache *cache
log log.Logger
}
type Author struct {
PublicKey types.HexBytes `json:"publicKey"`
Alias string `json:"alias"`
Identicon string `json:"identicon"`
}
type Metadata struct {
DedupID []byte `json:"dedupId"`
EncryptionID types.HexBytes `json:"encryptionId"`
MessageID types.HexBytes `json:"messageId"`
Author Author `json:"author"`
}
type DeduplicateMessage struct {
Message *types.Message `json:"message"`
Metadata Metadata `json:"metadata"`
Payload string `json:"payload"`
MessageType v1.StatusMessageT `json:"messageType"`
ParsedMessage interface{} `json:"parsedMessage"`
}
// NewDeduplicator creates a new deduplicator
func NewDeduplicator(keyPairProvider keyPairProvider, db *leveldb.DB) *Deduplicator {
return &Deduplicator{
log: log.New("package", "status-go/services/sshext.deduplicator"),
keyPairProvider: keyPairProvider,
cache: newCache(db),
}
}
// Deduplicate receives a list of whisper messages and
// returns the list of the messages that weren't filtered previously for the
// specified filter.
func (d *Deduplicator) Deduplicate(messages []*DeduplicateMessage) []*DeduplicateMessage {
result := make([]*DeduplicateMessage, 0)
selectedKeyPairID := d.keyPairProvider.SelectedKeyPairID()
for _, message := range messages {
if has, err := d.cache.Has(selectedKeyPairID, message.Message); !has {
if err != nil {
d.log.Error("error while deduplicating messages: search cache failed", "err", err)
}
message.Metadata.DedupID = d.cache.KeyToday(selectedKeyPairID, message.Message)
result = append(result, message)
}
}
return result
}
// AddMessages adds a message to the deduplicator DB, so it will be filtered
// out.
func (d *Deduplicator) AddMessagesByID(messageIDs [][]byte) error {
return d.cache.PutIDs(messageIDs)
}
// AddMessageByID adds a message to the deduplicator DB, so it will be filtered
// out.
func (d *Deduplicator) AddMessageByID(messageIDs [][]byte) error {
return d.cache.PutIDs(messageIDs)
}

View File

@ -1,148 +0,0 @@
package dedup
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
type dummyKeyPairProvider struct {
id string
}
func (p dummyKeyPairProvider) SelectedKeyPairID() string {
return p.id
}
func BenchmarkDeduplicate30000MessagesADay(b *testing.B) {
// using on-disk db here for real benchmarks
dir, err := ioutil.TempDir("", "dedup-30000")
if err != nil {
panic(err)
}
defer func() {
err := os.RemoveAll(dir)
if err != nil {
panic(err)
}
}()
db, err := leveldb.OpenFile(dir, nil)
if err != nil {
panic(err)
}
d := NewDeduplicator(dummyKeyPairProvider{}, db)
b.Log("generating messages")
messagesOld := generateDedupMessages(100000)
b.Log("generation is done")
// pre-fill deduplicator
d.Deduplicate(messagesOld[:1000])
b.ResetTimer()
length := 300
start := 1000
for n := 0; n < b.N; n++ {
if n%100 == 0 {
d.cache.now = func() time.Time { return time.Now().Add(time.Duration(24*(n/100)) * time.Hour) }
}
if (start + length) >= len(messagesOld) {
start = 0
fmt.Println("cycle!")
}
messages := messagesOld[start:(start + length)]
start += length
dedupMessages := d.Deduplicate(messages)
ids := make([][]byte, len(dedupMessages))
for i, m := range dedupMessages {
ids[i] = m.Metadata.DedupID
}
assert.NoError(b, d.AddMessagesByID(ids))
}
}
func TestDeduplicatorTestSuite(t *testing.T) {
suite.Run(t, new(DeduplicatorTestSuite))
}
type DeduplicatorTestSuite struct {
suite.Suite
d *Deduplicator
db *leveldb.DB
}
func (s *DeduplicatorTestSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
panic(err)
}
s.db = db
s.d = NewDeduplicator(dummyKeyPairProvider{}, db)
}
func (s *DeduplicatorTestSuite) TearDownTest() {
s.NoError(s.db.Close())
}
func (s *DeduplicatorTestSuite) TestDeduplicateSingleFilter() {
s.d.keyPairProvider = dummyKeyPairProvider{"acc1"}
messages1 := generateDedupMessages(10)
messages2 := generateDedupMessages(12)
result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
ids := make([][]byte, len(result))
for i, m := range result {
ids[i] = m.Metadata.DedupID
}
s.NoError(s.d.AddMessagesByID(ids))
result = s.d.Deduplicate(messages1)
s.Equal(0, len(result))
result = s.d.Deduplicate(messages2)
s.Equal(len(messages2), len(result))
ids = make([][]byte, len(result))
for i, m := range result {
ids[i] = m.Metadata.DedupID
}
s.NoError(s.d.AddMessagesByID(ids))
messages3 := append(messages2, generateDedupMessages(11)...)
result = s.d.Deduplicate(messages3)
s.Equal(11, len(result))
}
func (s *DeduplicatorTestSuite) TestDeduplicateMultipleFilters() {
messages1 := generateDedupMessages(10)
s.d.keyPairProvider = dummyKeyPairProvider{"acc1"}
result := s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
ids := make([][]byte, len(result))
for i, m := range result {
ids[i] = m.Metadata.DedupID
}
s.NoError(s.d.AddMessagesByID(ids))
result = s.d.Deduplicate(messages1)
s.Equal(0, len(result))
s.d.keyPairProvider = dummyKeyPairProvider{"acc2"}
result = s.d.Deduplicate(messages1)
s.Equal(len(messages1), len(result))
}

View File

@ -1,38 +0,0 @@
package dedup
import (
"crypto/rand"
"github.com/status-im/status-go/eth-node/types"
)
func generateMessages(count int) []*types.Message {
result := []*types.Message{}
for ; count > 0; count-- {
content := mustGenerateRandomBytes()
result = append(result, &types.Message{Payload: content})
}
return result
}
func generateDedupMessages(count int) []*DeduplicateMessage {
result := []*DeduplicateMessage{}
for ; count > 0; count-- {
content := mustGenerateRandomBytes()
result = append(result, &DeduplicateMessage{
Metadata: Metadata{},
Message: &types.Message{Payload: content},
})
}
return result
}
func mustGenerateRandomBytes() []byte {
c := 2048
b := make([]byte, c)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return b
}

View File

@ -19,7 +19,6 @@ import (
"github.com/status-im/status-go/db" "github.com/status-im/status-go/db"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext/dedup"
"github.com/status-im/status-go/services/shhext/mailservers" "github.com/status-im/status-go/services/shhext/mailservers"
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
@ -48,6 +47,7 @@ type EnvelopeEventsHandler interface {
// Service is a service that provides some additional Whisper API. // Service is a service that provides some additional Whisper API.
type Service struct { type Service struct {
messenger *protocol.Messenger messenger *protocol.Messenger
identity *ecdsa.PrivateKey
cancelMessenger chan struct{} cancelMessenger chan struct{}
storage db.TransactionalStorage storage db.TransactionalStorage
@ -59,7 +59,6 @@ type Service struct {
historyUpdates *HistoryUpdateReactor historyUpdates *HistoryUpdateReactor
server *p2p.Server server *p2p.Server
nodeID *ecdsa.PrivateKey nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator
peerStore *mailservers.PeerStore peerStore *mailservers.PeerStore
cache *mailservers.Cache cache *mailservers.Cache
connManager *mailservers.ConnectionManager connManager *mailservers.ConnectionManager
@ -97,17 +96,27 @@ func New(n types.Node, ctx interface{}, handler EnvelopeEventsHandler, ldb *leve
mailMonitor: mailMonitor, mailMonitor: mailMonitor,
requestsRegistry: requestsRegistry, requestsRegistry: requestsRegistry,
historyUpdates: historyUpdates, historyUpdates: historyUpdates,
deduplicator: dedup.NewDeduplicator(w, ldb),
peerStore: ps, peerStore: ps,
cache: cache, cache: cache,
} }
} }
func (s *Service) InitProtocol(db *sql.DB) error { // nolint: gocyclo func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB) error { // nolint: gocyclo
if !s.config.PFSEnabled { if !s.config.PFSEnabled {
return nil return nil
} }
// If Messenger has been already set up, we need to shut it down
// before we init it again. Otherwise, it will lead to goroutines leakage
// due to not stopped filters.
if s.messenger != nil {
if err := s.messenger.Shutdown(); err != nil {
return err
}
}
s.identity = identity
dataDir := filepath.Clean(s.config.BackupDisabledDataDir) dataDir := filepath.Clean(s.config.BackupDisabledDataDir)
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
@ -131,12 +140,6 @@ func (s *Service) InitProtocol(db *sql.DB) error { // nolint: gocyclo
} }
options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, zapLogger) options := buildMessengerOptions(s.config, db, envelopesMonitorConfig, zapLogger)
selectedKeyID := s.w.SelectedKeyPairID()
identity, err := s.w.GetPrivateKey(selectedKeyID)
if err != nil {
return err
}
messenger, err := protocol.NewMessenger( messenger, err := protocol.NewMessenger(
identity, identity,
s.n, s.n,

View File

@ -122,8 +122,6 @@ func (s *ShhExtSuite) SetupTest() {
privateKey, err := crypto.GenerateKey() privateKey, err := crypto.GenerateKey()
s.NoError(err) s.NoError(err)
err = s.whisper[i].SelectKeyPair(privateKey)
s.NoError(err)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return gethbridge.GetGethWhisperFrom(s.whisperWrapper[i]), nil return gethbridge.GetGethWhisperFrom(s.whisperWrapper[i]), nil
@ -147,7 +145,7 @@ func (s *ShhExtSuite) SetupTest() {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/%d", tmpdir, i), "password") sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/%d", tmpdir, i), "password")
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(s.services[i].InitProtocol(sqlDB)) s.Require().NoError(s.services[i].InitProtocol(privateKey, sqlDB))
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return s.services[i], nil return s.services[i], nil
})) }))
@ -173,8 +171,6 @@ func (s *ShhExtSuite) TestInitProtocol() {
shh := gethbridge.NewGethWhisperWrapper(whisper.New(nil)) shh := gethbridge.NewGethWhisperWrapper(whisper.New(nil))
privateKey, err := crypto.GenerateKey() privateKey, err := crypto.GenerateKey()
s.Require().NoError(err) s.Require().NoError(err)
err = gethbridge.GetGethWhisperFrom(shh).SelectKeyPair(privateKey)
s.Require().NoError(err)
nodeWrapper := &testNodeWrapper{w: shh} nodeWrapper := &testNodeWrapper{w: shh}
service := New(nodeWrapper, nil, nil, db, config) service := New(nodeWrapper, nil, nil, db, config)
@ -185,7 +181,7 @@ func (s *ShhExtSuite) TestInitProtocol() {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password") sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
s.Require().NoError(err) s.Require().NoError(err)
err = service.InitProtocol(sqlDB) err = service.InitProtocol(privateKey, sqlDB)
s.NoError(err) s.NoError(err)
} }
@ -301,8 +297,6 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
shh := gethbridge.NewGethWhisperWrapper(whisper.New(nil)) shh := gethbridge.NewGethWhisperWrapper(whisper.New(nil))
privateKey, err := crypto.GenerateKey() privateKey, err := crypto.GenerateKey()
s.Require().NoError(err) s.Require().NoError(err)
err = gethbridge.GetGethWhisperFrom(shh).SelectKeyPair(privateKey)
s.Require().NoError(err)
aNode, err := node.New(&node.Config{ aNode, err := node.New(&node.Config{
P2P: p2p.Config{ P2P: p2p.Config{
MaxPeers: math.MaxInt32, MaxPeers: math.MaxInt32,
@ -333,7 +327,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password") sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(service.InitProtocol(sqlDB)) s.Require().NoError(service.InitProtocol(privateKey, sqlDB))
s.Require().NoError(service.Start(aNode.Server())) s.Require().NoError(service.Start(aNode.Server()))
api := NewPublicAPI(service) api := NewPublicAPI(service)
@ -599,7 +593,7 @@ func (s *RequestWithTrackingHistorySuite) SetupTest() {
sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password") sqlDB, err := sqlite.OpenDB(fmt.Sprintf("%s/db.sql", tmpdir), "password")
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(s.localService.InitProtocol(sqlDB)) s.Require().NoError(s.localService.InitProtocol(nil, sqlDB))
s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}})) s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}}))
s.localAPI = NewPublicAPI(s.localService) s.localAPI = NewPublicAPI(s.localService)

View File

@ -5,10 +5,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
statusproto "github.com/status-im/status-go/protocol" statusproto "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/services/shhext/dedup"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
statustransp "github.com/status-im/status-go/protocol/transport/whisper"
) )
const ( const (
@ -141,12 +139,6 @@ type EnodeDiscoveredSignal struct {
Topic string `json:"topic"` Topic string `json:"topic"`
} }
type Messages struct {
Error error `json:"error"`
Messages []*dedup.DeduplicateMessage `json:"messages"`
Chat statustransp.Filter `json:"chat"` // not a mistake, it's called chat in status-react
}
// SendEnodeDiscovered tiggered when an enode is discovered. // SendEnodeDiscovered tiggered when an enode is discovered.
// finds a new enode. // finds a new enode.
func SendEnodeDiscovered(enode, topic string) { func SendEnodeDiscovered(enode, topic string) {

View File

@ -144,6 +144,7 @@ func (s *WhisperTestSuite) TestLogout() {
// make sure that identity doesn't exist (yet) in Whisper // make sure that identity doesn't exist (yet) in Whisper
s.False(whisperService.HasKeyPair(accountInfo.ChatPubKey), "identity already present in whisper") s.False(whisperService.HasKeyPair(accountInfo.ChatPubKey), "identity already present in whisper")
// it should exist only after selecting an account
s.NoError(s.Backend.SelectAccount(buildLoginParams(accountInfo.WalletAddress, accountInfo.ChatAddress, TestConfig.Account1.Password))) s.NoError(s.Backend.SelectAccount(buildLoginParams(accountInfo.WalletAddress, accountInfo.ChatAddress, TestConfig.Account1.Password)))
s.True(whisperService.HasKeyPair(accountInfo.ChatPubKey), "identity not injected into whisper") s.True(whisperService.HasKeyPair(accountInfo.ChatPubKey), "identity not injected into whisper")
@ -187,17 +188,17 @@ func (s *WhisperTestSuite) TestSelectedAccountOnRestart() {
s.NoError(err) s.NoError(err)
selectedChatPubKey1 := types.EncodeHex(crypto.FromECDSAPub(&selectedChatAccount1.AccountKey.PrivateKey.PublicKey)) selectedChatPubKey1 := types.EncodeHex(crypto.FromECDSAPub(&selectedChatAccount1.AccountKey.PrivateKey.PublicKey))
s.Equal(selectedChatPubKey1, accountInfo1.ChatPubKey) s.Equal(selectedChatPubKey1, accountInfo1.ChatPubKey)
s.True(whisperService.HasKeyPair(selectedChatPubKey1), "identity not injected into whisper") s.True(whisperService.HasKeyPair(s.Backend.SelectedAccountShhKeyID()), "identity not injected into whisper")
// select another account, make sure that previous account is wiped out from Whisper cache // select another account, make sure that previous account is wiped out from Whisper cache
s.False(whisperService.HasKeyPair(accountInfo2.ChatPubKey), "identity already present in whisper") previousKeyID := s.Backend.SelectedAccountShhKeyID()
s.NoError(s.Backend.SelectAccount(buildLoginParams(accountInfo2.WalletAddress, accountInfo2.ChatAddress, TestConfig.Account2.Password))) s.NoError(s.Backend.SelectAccount(buildLoginParams(accountInfo2.WalletAddress, accountInfo2.ChatAddress, TestConfig.Account2.Password)))
selectedChatAccount2, err := s.Backend.AccountManager().SelectedChatAccount() selectedChatAccount2, err := s.Backend.AccountManager().SelectedChatAccount()
s.NoError(err) s.NoError(err)
selectedChatPubKey2 := types.EncodeHex(crypto.FromECDSAPub(&selectedChatAccount2.AccountKey.PrivateKey.PublicKey)) selectedChatPubKey2 := types.EncodeHex(crypto.FromECDSAPub(&selectedChatAccount2.AccountKey.PrivateKey.PublicKey))
s.Equal(selectedChatPubKey2, accountInfo2.ChatPubKey) s.Equal(selectedChatPubKey2, accountInfo2.ChatPubKey)
s.True(whisperService.HasKeyPair(selectedChatPubKey2), "identity not injected into whisper") s.True(whisperService.HasKeyPair(s.Backend.SelectedAccountShhKeyID()), "identity not injected into whisper")
s.False(whisperService.HasKeyPair(selectedChatPubKey1), "identity should be removed, but it is still present in whisper") s.False(whisperService.HasKeyPair(previousKeyID), "identity should be removed, but it is still present in whisper")
// stop node (and all of its sub-protocols) // stop node (and all of its sub-protocols)
nodeConfig := s.Backend.StatusNode().Config() nodeConfig := s.Backend.StatusNode().Config()
@ -220,23 +221,23 @@ func (s *WhisperTestSuite) TestSelectedAccountOnRestart() {
// make sure that Whisper gets identity re-injected // make sure that Whisper gets identity re-injected
whisperService = s.WhisperService() whisperService = s.WhisperService()
s.True(whisperService.HasKeyPair(selectedChatPubKey2), "identity not injected into whisper") s.True(whisperService.HasKeyPair(s.Backend.SelectedAccountShhKeyID()), "identity not injected into whisper")
s.False(whisperService.HasKeyPair(selectedChatPubKey1), "identity should not be present, but it is still present in whisper") s.False(whisperService.HasKeyPair(previousKeyID), "identity should not be present, but it is still present in whisper")
// now restart node using RestartNode() method, and make sure that account is still available // now restart node using RestartNode() method, and make sure that account is still available
s.RestartTestNode() s.RestartTestNode()
defer s.StopTestBackend() defer s.StopTestBackend()
whisperService = s.WhisperService() whisperService = s.WhisperService()
s.True(whisperService.HasKeyPair(selectedChatPubKey2), "identity not injected into whisper") s.True(whisperService.HasKeyPair(s.Backend.SelectedAccountShhKeyID()), "identity not injected into whisper")
s.False(whisperService.HasKeyPair(selectedChatPubKey1), "identity should not be present, but it is still present in whisper") s.False(whisperService.HasKeyPair(previousKeyID), "identity should not be present, but it is still present in whisper")
// now logout, and make sure that on restart no account is selected (i.e. logout works properly) // now logout, and make sure that on restart no account is selected (i.e. logout works properly)
s.NoError(s.Backend.Logout()) s.NoError(s.Backend.Logout())
s.RestartTestNode() s.RestartTestNode()
whisperService = s.WhisperService() whisperService = s.WhisperService()
s.False(whisperService.HasKeyPair(selectedChatPubKey2), "identity not injected into whisper") s.False(whisperService.HasKeyPair(s.Backend.SelectedAccountShhKeyID()), "identity not injected into whisper")
s.False(whisperService.HasKeyPair(selectedChatPubKey1), "identity should not be present, but it is still present in whisper") s.False(whisperService.HasKeyPair(previousKeyID), "identity should not be present, but it is still present in whisper")
selectedWalletAccount, err = s.Backend.AccountManager().MainAccountAddress() selectedWalletAccount, err = s.Backend.AccountManager().MainAccountAddress()
s.EqualError(account.ErrNoAccountSelected, err.Error()) s.EqualError(account.ErrNoAccountSelected, err.Error())

View File

@ -633,24 +633,6 @@ func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
return id, nil return id, nil
} }
// SelectKeyPair adds cryptographic identity, and makes sure
// that it is the only private key known to the node.
func (whisper *Whisper) SelectKeyPair(key *ecdsa.PrivateKey) error {
id, err := makeDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize)
if err != nil {
return err
}
whisper.keyMu.Lock()
defer whisper.keyMu.Unlock()
whisper.privateKeys = make(map[string]*ecdsa.PrivateKey) // reset key store
whisper.privateKeys[id] = key
log.Info("Whisper identity selected", "id", id, "key", common.ToHex(crypto.FromECDSAPub(&key.PublicKey)))
return nil
}
// DeleteKeyPairs removes all cryptographic identities known to the node // DeleteKeyPairs removes all cryptographic identities known to the node
func (whisper *Whisper) DeleteKeyPairs() error { func (whisper *Whisper) DeleteKeyPairs() error {
whisper.keyMu.Lock() whisper.keyMu.Lock()
@ -1608,15 +1590,3 @@ func addBloom(a, b []byte) []byte {
} }
return c return c
} }
// SelectedKeyPairID returns the id of currently selected key pair.
// It helps distinguish between different users w/o exposing the user identity itself.
func (whisper *Whisper) SelectedKeyPairID() string {
whisper.keyMu.RLock()
defer whisper.keyMu.RUnlock()
for id := range whisper.privateKeys {
return id
}
return ""
}