From fa639ffc7473ad8772e08478fe9d75c628f04c67 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Thu, 4 Jul 2019 12:22:57 +0200 Subject: [PATCH] Handle device not found When receiving a message from someone not targeting our device, we reply with an empty message that includes our own devices, so next time they send a message they will include our device. --- VERSION | 2 +- messaging/db/migrations/bindata.go | 8 ++-- messaging/publisher/persistence.go | 35 ++++++++++++--- messaging/publisher/persistence_test.go | 57 +++++++++++++++++++++++++ messaging/publisher/publisher.go | 43 ++++++++++++++++++- services/shhext/service.go | 12 +----- 6 files changed, 133 insertions(+), 24 deletions(-) create mode 100644 messaging/publisher/persistence_test.go diff --git a/VERSION b/VERSION index 999ebbb45..f1346caac 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.30.0-alpha.1 +0.30.0-beta.1 diff --git a/messaging/db/migrations/bindata.go b/messaging/db/migrations/bindata.go index 1f2583249..0e9116d92 100644 --- a/messaging/db/migrations/bindata.go +++ b/messaging/db/migrations/bindata.go @@ -400,7 +400,7 @@ func _1561368210_add_installation_metadataDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1561368210_add_installation_metadata.down.sql", size: 35, mode: os.FileMode(420), modTime: time.Unix(1561459323, 0)} + info := bindataFileInfo{name: "1561368210_add_installation_metadata.down.sql", size: 35, mode: os.FileMode(420), modTime: time.Unix(1561969196, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -420,12 +420,12 @@ func _1561368210_add_installation_metadataUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1561368210_add_installation_metadata.up.sql", size: 267, mode: os.FileMode(420), modTime: time.Unix(1561459769, 0)} + info := bindataFileInfo{name: "1561368210_add_installation_metadata.up.sql", size: 267, mode: os.FileMode(420), modTime: time.Unix(1561969196, 0)} a := &asset{bytes: bytes, info: info} return a, nil } -var _staticGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xcc\x41\x6a\x03\x31\x0c\x46\xe1\xbd\x4f\xf1\x2f\x5b\xe8\x58\xfb\x9e\xa0\x94\x16\x0a\xcd\x05\x64\x8f\x90\xc5\x30\xf6\x60\x29\x21\xc7\xcf\x26\x21\x64\xf9\xe0\xf1\x11\xe1\x8f\xeb\xc6\x2a\xf0\xe0\xb0\x0a\xd9\x8b\xac\xfe\xa8\xb7\xef\xff\x0f\x7c\x9d\x7e\x7f\xde\x31\xc5\xc7\x79\x56\x71\x4c\xd3\x16\xb0\x1e\x03\xd1\x04\xc5\x3a\x4f\x13\x4f\xc7\x8b\x94\x12\x91\x8e\x4f\x95\x2e\x93\x43\xa0\x63\x29\xd6\x57\x0e\xc6\x72\x6c\x8a\xdd\x74\x72\xd8\xe8\x8e\x65\x20\x67\xca\x99\x5c\xe6\xc5\xaa\x38\x79\x6b\x72\x0d\xaa\x8d\x83\xd6\x42\xcf\x97\xee\x46\xd6\x81\x9c\x6e\x01\x00\x00\xff\xff\x6c\x21\xbf\x7a\xbf\x00\x00\x00") +var _staticGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x8c\x41\x6a\xc3\x40\x0c\x45\xf7\x73\x8a\xbf\x6c\xa1\x1e\xed\x7b\x82\x52\x12\x08\x24\x17\xd0\xd8\x42\x16\xc6\x23\x33\x52\xee\x9f\x4d\x42\xc8\xf2\xc1\x7b\x8f\x08\x17\x9e\x37\x56\x41\x24\xa7\xcd\x90\xbd\xc9\x12\x2f\xfa\xfa\xbf\xfe\xe0\xef\x76\x3e\x7d\x63\x48\xf8\x7d\xcc\x12\x18\xa6\x6b\xc2\x7a\x3a\x72\x15\x34\xeb\x3c\x4c\xa2\x1c\x1f\xa7\x52\x88\xd4\x7f\x55\xba\x0c\x4e\x81\xfa\xd4\xac\x2f\x9c\x8c\xe9\xd8\x14\xbb\xe9\xe0\x34\xef\x81\xc9\x51\x2b\xd5\x4a\xbb\x44\xb0\x5a\x57\x5a\x1a\xbd\x05\x7a\x86\x55\x1d\xb5\x3c\x02\x00\x00\xff\xff\x97\x9b\xee\x8f\xb4\x00\x00\x00") func staticGoBytes() ([]byte, error) { return bindataRead( @@ -440,7 +440,7 @@ func staticGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "static.go", size: 191, mode: os.FileMode(420), modTime: time.Unix(1561038914, 0)} + info := bindataFileInfo{name: "static.go", size: 180, mode: os.FileMode(420), modTime: time.Unix(1562216541, 0)} a := &asset{bytes: bytes, info: info} return a, nil } diff --git a/messaging/publisher/persistence.go b/messaging/publisher/persistence.go index 722a60a82..9da063402 100644 --- a/messaging/publisher/persistence.go +++ b/messaging/publisher/persistence.go @@ -2,22 +2,32 @@ package publisher import ( "database/sql" + "fmt" + "sync" ) type Persistence interface { - Get() (int64, error) - Set(int64) error + GetLastPublished() (int64, error) + SetLastPublished(int64) error + GetLastAcked(identity []byte) (int64, error) + SetLastAcked(identity []byte, lastAcked int64) error } type SQLLitePersistence struct { - db *sql.DB + db *sql.DB + lastAcked map[string]int64 + lastAckedMutex sync.Mutex } func NewSQLLitePersistence(db *sql.DB) *SQLLitePersistence { - return &SQLLitePersistence{db: db} + return &SQLLitePersistence{ + db: db, + lastAcked: make(map[string]int64), + lastAckedMutex: sync.Mutex{}, + } } -func (s *SQLLitePersistence) Get() (int64, error) { +func (s *SQLLitePersistence) GetLastPublished() (int64, error) { var lastPublished int64 statement := "SELECT last_published FROM contact_code_config LIMIT 1" err := s.db.QueryRow(statement).Scan(&lastPublished) @@ -29,7 +39,7 @@ func (s *SQLLitePersistence) Get() (int64, error) { return lastPublished, nil } -func (s *SQLLitePersistence) Set(lastPublished int64) error { +func (s *SQLLitePersistence) SetLastPublished(lastPublished int64) error { statement := "UPDATE contact_code_config SET last_published = ?" stmt, err := s.db.Prepare(statement) if err != nil { @@ -40,3 +50,16 @@ func (s *SQLLitePersistence) Set(lastPublished int64) error { _, err = stmt.Exec(lastPublished) return err } + +func (s *SQLLitePersistence) GetLastAcked(identity []byte) (int64, error) { + s.lastAckedMutex.Lock() + defer s.lastAckedMutex.Unlock() + return s.lastAcked[fmt.Sprintf("%x", identity)], nil +} + +func (s *SQLLitePersistence) SetLastAcked(identity []byte, lastAcked int64) error { + s.lastAckedMutex.Lock() + defer s.lastAckedMutex.Unlock() + s.lastAcked[fmt.Sprintf("%x", identity)] = lastAcked + return nil +} diff --git a/messaging/publisher/persistence_test.go b/messaging/publisher/persistence_test.go new file mode 100644 index 000000000..3cf505adb --- /dev/null +++ b/messaging/publisher/persistence_test.go @@ -0,0 +1,57 @@ +package publisher + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/status-im/status-go/messaging/chat" + "github.com/stretchr/testify/suite" +) + +func TestPersistenceTestSuite(t *testing.T) { + suite.Run(t, new(PersistenceTestSuite)) +} + +type PersistenceTestSuite struct { + suite.Suite + persistence Persistence +} + +func (s *PersistenceTestSuite) SetupTest() { + dir, err := ioutil.TempDir("", "publisher-persistence-test") + s.Require().NoError(err) + + p, err := chat.NewSQLLitePersistence(filepath.Join(dir, "db1.sql"), "pass") + s.Require().NoError(err) + + s.persistence = NewSQLLitePersistence(p.DB) +} + +func (s *PersistenceTestSuite) TestLastAcked() { + identity := []byte("identity") + // Nothing in the database + lastAcked1, err := s.persistence.GetLastAcked(identity) + s.Require().NoError(err) + s.Require().Equal(int64(0), lastAcked1) + + err = s.persistence.SetLastAcked(identity, 3) + s.Require().NoError(err) + + lastAcked2, err := s.persistence.GetLastAcked(identity) + s.Require().NoError(err) + s.Require().Equal(int64(3), lastAcked2) +} + +func (s *PersistenceTestSuite) TestLastPublished() { + lastPublished1, err := s.persistence.GetLastPublished() + s.Require().NoError(err) + s.Require().Equal(int64(0), lastPublished1) + + err = s.persistence.SetLastPublished(3) + s.Require().NoError(err) + + lastPublished2, err := s.persistence.GetLastPublished() + s.Require().NoError(err) + s.Require().Equal(int64(3), lastPublished2) +} diff --git a/messaging/publisher/publisher.go b/messaging/publisher/publisher.go index f9d6f7cd6..fb5862bfc 100644 --- a/messaging/publisher/publisher.go +++ b/messaging/publisher/publisher.go @@ -30,6 +30,8 @@ const ( publishInterval = 21600 // How often we should check for new messages pollIntervalMs = 300 + // Cooldown period on acking messages when not targeting our device + deviceNotFoundAckInterval = 7200 ) var ( @@ -256,7 +258,15 @@ func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error { response, err := p.protocol.HandleMessage(privateKey, publicKey, protocolMessage, msgID) if err == nil { msg.Payload = response + } else if err == chat.ErrDeviceNotFound { + if err := p.handleDeviceNotFound(privateKey, publicKey); err != nil { + p.log.Error("Failed to handle DeviceNotFound", "err", err) + } + + // Return the original error + return err } + return err } @@ -427,7 +437,7 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) { return nil, nil } - lastPublished, err := p.persistence.Get() + lastPublished, err := p.persistence.GetLastPublished() if err != nil { p.log.Error("could not fetch config from db", "err", err) return nil, err @@ -469,7 +479,7 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) { return nil, err } - err = p.persistence.Set(now) + err = p.persistence.SetLastPublished(now) if err != nil { p.log.Error("could not set last published", "err", err) return nil, err @@ -477,3 +487,32 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) { return message, nil } + +// handleDeviceNotFound sends an empty message to publicKey containing our bundle information +// so it's notified of our devices +func (p *Publisher) handleDeviceNotFound(privateKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error { + now := time.Now().Unix() + identity := crypto.CompressPubkey(publicKey) + + lastAcked, err := p.persistence.GetLastAcked(identity) + if err != nil { + return err + } + + if now-lastAcked < deviceNotFoundAckInterval { + p.log.Debug("already acked identity", "identity", identity, "lastAcked", lastAcked) + return nil + } + + message, err := p.CreateDirectMessage(privateKey, publicKey, true, nil) + if err != nil { + return err + } + + _, err = p.whisperAPI.Post(context.TODO(), *message) + if err != nil { + return err + } + + return p.persistence.SetLastAcked(identity, now) +} diff --git a/services/shhext/service.go b/services/shhext/service.go index df10b05d5..69d0271b4 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -9,7 +9,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -229,16 +228,7 @@ func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup. case chat.ErrNotPairedDevice: log.Info("Received a message from non-paired device", "err", err) case chat.ErrDeviceNotFound: - log.Warn("Device not found, sending signal", "err", err) - - publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig) - if err != nil { - return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err) - } - - keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey)) - handler := PublisherSignalHandler{} - handler.DecryptMessageFailed(keyString) + log.Warn("Received a message not targeted to us", "err", err) default: if err != nil { log.Error("Failed handling message with error", "err", err)