Handle device not found
When receiving a message from someone not targeting our device, we reply with an empty message that includes our own devices, so next time they send a message they will include our device.
This commit is contained in:
parent
5ee7034f97
commit
fa639ffc74
|
@ -400,7 +400,7 @@ func _1561368210_add_installation_metadataDownSql() (*asset, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := bindataFileInfo{name: "1561368210_add_installation_metadata.down.sql", size: 35, mode: os.FileMode(420), modTime: time.Unix(1561459323, 0)}
|
info := bindataFileInfo{name: "1561368210_add_installation_metadata.down.sql", size: 35, mode: os.FileMode(420), modTime: time.Unix(1561969196, 0)}
|
||||||
a := &asset{bytes: bytes, info: info}
|
a := &asset{bytes: bytes, info: info}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
@ -420,12 +420,12 @@ func _1561368210_add_installation_metadataUpSql() (*asset, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := bindataFileInfo{name: "1561368210_add_installation_metadata.up.sql", size: 267, mode: os.FileMode(420), modTime: time.Unix(1561459769, 0)}
|
info := bindataFileInfo{name: "1561368210_add_installation_metadata.up.sql", size: 267, mode: os.FileMode(420), modTime: time.Unix(1561969196, 0)}
|
||||||
a := &asset{bytes: bytes, info: info}
|
a := &asset{bytes: bytes, info: info}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _staticGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\xcc\x41\x6a\x03\x31\x0c\x46\xe1\xbd\x4f\xf1\x2f\x5b\xe8\x58\xfb\x9e\xa0\x94\x16\x0a\xcd\x05\x64\x8f\x90\xc5\x30\xf6\x60\x29\x21\xc7\xcf\x26\x21\x64\xf9\xe0\xf1\x11\xe1\x8f\xeb\xc6\x2a\xf0\xe0\xb0\x0a\xd9\x8b\xac\xfe\xa8\xb7\xef\xff\x0f\x7c\x9d\x7e\x7f\xde\x31\xc5\xc7\x79\x56\x71\x4c\xd3\x16\xb0\x1e\x03\xd1\x04\xc5\x3a\x4f\x13\x4f\xc7\x8b\x94\x12\x91\x8e\x4f\x95\x2e\x93\x43\xa0\x63\x29\xd6\x57\x0e\xc6\x72\x6c\x8a\xdd\x74\x72\xd8\xe8\x8e\x65\x20\x67\xca\x99\x5c\xe6\xc5\xaa\x38\x79\x6b\x72\x0d\xaa\x8d\x83\xd6\x42\xcf\x97\xee\x46\xd6\x81\x9c\x6e\x01\x00\x00\xff\xff\x6c\x21\xbf\x7a\xbf\x00\x00\x00")
|
var _staticGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x8c\x41\x6a\xc3\x40\x0c\x45\xf7\x73\x8a\xbf\x6c\xa1\x1e\xed\x7b\x82\x52\x12\x08\x24\x17\xd0\xd8\x42\x16\xc6\x23\x33\x52\xee\x9f\x4d\x42\xc8\xf2\xc1\x7b\x8f\x08\x17\x9e\x37\x56\x41\x24\xa7\xcd\x90\xbd\xc9\x12\x2f\xfa\xfa\xbf\xfe\xe0\xef\x76\x3e\x7d\x63\x48\xf8\x7d\xcc\x12\x18\xa6\x6b\xc2\x7a\x3a\x72\x15\x34\xeb\x3c\x4c\xa2\x1c\x1f\xa7\x52\x88\xd4\x7f\x55\xba\x0c\x4e\x81\xfa\xd4\xac\x2f\x9c\x8c\xe9\xd8\x14\xbb\xe9\xe0\x34\xef\x81\xc9\x51\x2b\xd5\x4a\xbb\x44\xb0\x5a\x57\x5a\x1a\xbd\x05\x7a\x86\x55\x1d\xb5\x3c\x02\x00\x00\xff\xff\x97\x9b\xee\x8f\xb4\x00\x00\x00")
|
||||||
|
|
||||||
func staticGoBytes() ([]byte, error) {
|
func staticGoBytes() ([]byte, error) {
|
||||||
return bindataRead(
|
return bindataRead(
|
||||||
|
@ -440,7 +440,7 @@ func staticGo() (*asset, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := bindataFileInfo{name: "static.go", size: 191, mode: os.FileMode(420), modTime: time.Unix(1561038914, 0)}
|
info := bindataFileInfo{name: "static.go", size: 180, mode: os.FileMode(420), modTime: time.Unix(1562216541, 0)}
|
||||||
a := &asset{bytes: bytes, info: info}
|
a := &asset{bytes: bytes, info: info}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,22 +2,32 @@ package publisher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Persistence interface {
|
type Persistence interface {
|
||||||
Get() (int64, error)
|
GetLastPublished() (int64, error)
|
||||||
Set(int64) error
|
SetLastPublished(int64) error
|
||||||
|
GetLastAcked(identity []byte) (int64, error)
|
||||||
|
SetLastAcked(identity []byte, lastAcked int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type SQLLitePersistence struct {
|
type SQLLitePersistence struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
lastAcked map[string]int64
|
||||||
|
lastAckedMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSQLLitePersistence(db *sql.DB) *SQLLitePersistence {
|
func NewSQLLitePersistence(db *sql.DB) *SQLLitePersistence {
|
||||||
return &SQLLitePersistence{db: db}
|
return &SQLLitePersistence{
|
||||||
|
db: db,
|
||||||
|
lastAcked: make(map[string]int64),
|
||||||
|
lastAckedMutex: sync.Mutex{},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLLitePersistence) Get() (int64, error) {
|
func (s *SQLLitePersistence) GetLastPublished() (int64, error) {
|
||||||
var lastPublished int64
|
var lastPublished int64
|
||||||
statement := "SELECT last_published FROM contact_code_config LIMIT 1"
|
statement := "SELECT last_published FROM contact_code_config LIMIT 1"
|
||||||
err := s.db.QueryRow(statement).Scan(&lastPublished)
|
err := s.db.QueryRow(statement).Scan(&lastPublished)
|
||||||
|
@ -29,7 +39,7 @@ func (s *SQLLitePersistence) Get() (int64, error) {
|
||||||
return lastPublished, nil
|
return lastPublished, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLLitePersistence) Set(lastPublished int64) error {
|
func (s *SQLLitePersistence) SetLastPublished(lastPublished int64) error {
|
||||||
statement := "UPDATE contact_code_config SET last_published = ?"
|
statement := "UPDATE contact_code_config SET last_published = ?"
|
||||||
stmt, err := s.db.Prepare(statement)
|
stmt, err := s.db.Prepare(statement)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -40,3 +50,16 @@ func (s *SQLLitePersistence) Set(lastPublished int64) error {
|
||||||
_, err = stmt.Exec(lastPublished)
|
_, err = stmt.Exec(lastPublished)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SQLLitePersistence) GetLastAcked(identity []byte) (int64, error) {
|
||||||
|
s.lastAckedMutex.Lock()
|
||||||
|
defer s.lastAckedMutex.Unlock()
|
||||||
|
return s.lastAcked[fmt.Sprintf("%x", identity)], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQLLitePersistence) SetLastAcked(identity []byte, lastAcked int64) error {
|
||||||
|
s.lastAckedMutex.Lock()
|
||||||
|
defer s.lastAckedMutex.Unlock()
|
||||||
|
s.lastAcked[fmt.Sprintf("%x", identity)] = lastAcked
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
package publisher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/messaging/chat"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPersistenceTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(PersistenceTestSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
type PersistenceTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
persistence Persistence
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PersistenceTestSuite) SetupTest() {
|
||||||
|
dir, err := ioutil.TempDir("", "publisher-persistence-test")
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
p, err := chat.NewSQLLitePersistence(filepath.Join(dir, "db1.sql"), "pass")
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
s.persistence = NewSQLLitePersistence(p.DB)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PersistenceTestSuite) TestLastAcked() {
|
||||||
|
identity := []byte("identity")
|
||||||
|
// Nothing in the database
|
||||||
|
lastAcked1, err := s.persistence.GetLastAcked(identity)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().Equal(int64(0), lastAcked1)
|
||||||
|
|
||||||
|
err = s.persistence.SetLastAcked(identity, 3)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
lastAcked2, err := s.persistence.GetLastAcked(identity)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().Equal(int64(3), lastAcked2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PersistenceTestSuite) TestLastPublished() {
|
||||||
|
lastPublished1, err := s.persistence.GetLastPublished()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().Equal(int64(0), lastPublished1)
|
||||||
|
|
||||||
|
err = s.persistence.SetLastPublished(3)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
lastPublished2, err := s.persistence.GetLastPublished()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().Equal(int64(3), lastPublished2)
|
||||||
|
}
|
|
@ -30,6 +30,8 @@ const (
|
||||||
publishInterval = 21600
|
publishInterval = 21600
|
||||||
// How often we should check for new messages
|
// How often we should check for new messages
|
||||||
pollIntervalMs = 300
|
pollIntervalMs = 300
|
||||||
|
// Cooldown period on acking messages when not targeting our device
|
||||||
|
deviceNotFoundAckInterval = 7200
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -256,7 +258,15 @@ func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error {
|
||||||
response, err := p.protocol.HandleMessage(privateKey, publicKey, protocolMessage, msgID)
|
response, err := p.protocol.HandleMessage(privateKey, publicKey, protocolMessage, msgID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
msg.Payload = response
|
msg.Payload = response
|
||||||
|
} else if err == chat.ErrDeviceNotFound {
|
||||||
|
if err := p.handleDeviceNotFound(privateKey, publicKey); err != nil {
|
||||||
|
p.log.Error("Failed to handle DeviceNotFound", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the original error
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -427,7 +437,7 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPublished, err := p.persistence.Get()
|
lastPublished, err := p.persistence.GetLastPublished()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error("could not fetch config from db", "err", err)
|
p.log.Error("could not fetch config from db", "err", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -469,7 +479,7 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = p.persistence.Set(now)
|
err = p.persistence.SetLastPublished(now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error("could not set last published", "err", err)
|
p.log.Error("could not set last published", "err", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -477,3 +487,32 @@ func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
|
||||||
|
|
||||||
return message, nil
|
return message, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleDeviceNotFound sends an empty message to publicKey containing our bundle information
|
||||||
|
// so it's notified of our devices
|
||||||
|
func (p *Publisher) handleDeviceNotFound(privateKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error {
|
||||||
|
now := time.Now().Unix()
|
||||||
|
identity := crypto.CompressPubkey(publicKey)
|
||||||
|
|
||||||
|
lastAcked, err := p.persistence.GetLastAcked(identity)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if now-lastAcked < deviceNotFoundAckInterval {
|
||||||
|
p.log.Debug("already acked identity", "identity", identity, "lastAcked", lastAcked)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
message, err := p.CreateDirectMessage(privateKey, publicKey, true, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = p.whisperAPI.Post(context.TODO(), *message)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.persistence.SetLastAcked(identity, now)
|
||||||
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
@ -229,16 +228,7 @@ func (s *Service) processReceivedMessages(messages []*whisper.Message) ([]dedup.
|
||||||
case chat.ErrNotPairedDevice:
|
case chat.ErrNotPairedDevice:
|
||||||
log.Info("Received a message from non-paired device", "err", err)
|
log.Info("Received a message from non-paired device", "err", err)
|
||||||
case chat.ErrDeviceNotFound:
|
case chat.ErrDeviceNotFound:
|
||||||
log.Warn("Device not found, sending signal", "err", err)
|
log.Warn("Received a message not targeted to us", "err", err)
|
||||||
|
|
||||||
publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey))
|
|
||||||
handler := PublisherSignalHandler{}
|
|
||||||
handler.DecryptMessageFailed(keyString)
|
|
||||||
default:
|
default:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed handling message with error", "err", err)
|
log.Error("Failed handling message with error", "err", err)
|
||||||
|
|
Loading…
Reference in New Issue