Request part of messages that wasn't downloaded previously (#42)

This commit is contained in:
Dmitry Shulyak 2019-05-22 19:55:34 +03:00 committed by GitHub
parent 220bf05e8d
commit 59579bca82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 330 additions and 30 deletions

16
main.go
View File

@ -134,8 +134,20 @@ func main() {
{Name: "testing-adamb", Type: client.ContactPublicRoom, Topic: "testing-adamb"},
adambContact,
}
if err := db.SaveContacts(debugContacts); err != nil {
exitErr(err)
uniqueContacts := []client.Contact{}
for _, c := range debugContacts {
exist, err := db.ContactExist(c)
if err != nil {
exitErr(err)
}
if !exist {
uniqueContacts = append(uniqueContacts, c)
}
}
if len(uniqueContacts) != 0 {
if err := db.SaveContacts(uniqueContacts); err != nil {
exitErr(err)
}
}
}
go func() {

View File

@ -71,7 +71,10 @@ type Database interface {
Contacts() ([]Contact, error)
SaveContacts(contacts []Contact) error
DeleteContact(Contact) error
ContactExist(Contact) (bool, error)
PublicContactExist(Contact) (bool, error)
Histories() ([]History, error)
UpdateHistories([]History) error
}
// Migrate applies migrations.
@ -176,7 +179,11 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) {
if err != nil {
return err
}
stmt, err = tx.Prepare("INSERT OR REPLACE INTO user_contacts(id, name, type, state, topic, public_key) VALUES (?, ?, ?, ?, ?, ?)")
stmt, err = tx.Prepare("INSERT INTO user_contacts(id, name, type, state, topic, public_key) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
history, err := tx.Prepare("INSERT INTO history_user_contact_topic(contact_id) VALUES (?)")
if err != nil {
return err
}
@ -195,11 +202,16 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) {
return err
}
}
id := fmt.Sprintf("%s:%d", contacts[i].Name, contacts[i].Type)
id := contactID(contacts[i])
_, err = stmt.Exec(id, contacts[i].Name, contacts[i].Type, contacts[i].State, contacts[i].Topic, pkey)
if err != nil {
return err
}
// to avoid unmarshalling null into sql.NullInt64
_, err = history.Exec(id)
if err != nil {
return err
}
}
return err
}
@ -234,6 +246,61 @@ func (db SQLLiteDatabase) Contacts() ([]Contact, error) {
return rst, nil
}
func (db SQLLiteDatabase) Histories() ([]History, error) {
rows, err := db.db.Query("SELECT synced, u.name, u.type, u.state, u.topic, u.public_key FROM history_user_contact_topic JOIN user_contacts u ON contact_id = u.id")
if err != nil {
return nil, err
}
rst := []History{}
for rows.Next() {
h := History{
Contact: Contact{},
}
pkey := []byte{}
err = rows.Scan(&h.Synced, &h.Contact.Name, &h.Contact.Type, &h.Contact.State, &h.Contact.Topic, &pkey)
if err != nil {
return nil, err
}
if len(pkey) != 0 {
h.Contact.PublicKey, err = unmarshalEcdsaPub(pkey)
if err != nil {
return nil, err
}
}
rst = append(rst, h)
}
return rst, nil
}
func (db SQLLiteDatabase) UpdateHistories(histories []History) (err error) {
var (
tx *sql.Tx
stmt *sql.Stmt
)
tx, err = db.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return err
}
stmt, err = tx.Prepare("UPDATE history_user_contact_topic SET synced = ? WHERE contact_Id = ?")
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
for i := range histories {
_, err = stmt.Exec(histories[i].Synced, contactID(histories[i].Contact))
if err != nil {
return err
}
}
return nil
}
func (db SQLLiteDatabase) DeleteContact(c Contact) error {
_, err := db.db.Exec("DELETE FROM user_contacts WHERE id = ?", fmt.Sprintf("%s:%d", c.Name, c.Type))
if err != nil {
@ -242,6 +309,11 @@ func (db SQLLiteDatabase) DeleteContact(c Contact) error {
return nil
}
func (db SQLLiteDatabase) ContactExist(c Contact) (exists bool, err error) {
err = db.db.QueryRow("SELECT EXISTS(SELECT id FROM user_contacts WHERE id = ?)", contactID(c)).Scan(&exists)
return
}
func (db SQLLiteDatabase) PublicContactExist(c Contact) (exists bool, err error) {
var pkey []byte
if c.PublicKey != nil {
@ -253,7 +325,7 @@ func (db SQLLiteDatabase) PublicContactExist(c Contact) (exists bool, err error)
return false, errors.New("no public key")
}
err = db.db.QueryRow("SELECT EXISTS(SELECT id FROM user_contacts WHERE public_key = ?)", pkey).Scan(&exists)
return exists, err
return
}
func (db SQLLiteDatabase) LastMessageClock(c Contact) (int64, error) {

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestContactReplacedBySameName(t *testing.T) {
func TestContactUniqueConstraint(t *testing.T) {
db, err := InitializeTmpDB()
require.NoError(t, err)
defer db.Close()
@ -23,7 +23,7 @@ func TestContactReplacedBySameName(t *testing.T) {
Topic: "first",
}
require.NoError(t, db.SaveContacts([]Contact{contact}))
require.NoError(t, db.SaveContacts([]Contact{contact}))
require.EqualError(t, db.SaveContacts([]Contact{contact}), "UNIQUE constraint failed: user_contacts.id")
rst, err := db.Contacts()
require.NoError(t, err)
require.Len(t, rst, 1)
@ -139,6 +139,44 @@ func TestPublicContactExist(t *testing.T) {
require.True(t, exists, "contact expected to exist in database")
}
func TestLoadHistories(t *testing.T) {
db, err := InitializeTmpDB()
require.NoError(t, err)
defer db.Close()
c1 := Contact{
Name: "first",
Type: ContactPublicRoom,
}
c2 := Contact{
Name: "second",
Type: ContactPublicRoom,
}
require.NoError(t, db.SaveContacts([]Contact{c1, c2}))
histories, err := db.Histories()
require.NoError(t, err)
require.Len(t, histories, 2)
}
func TestUpdateHistories(t *testing.T) {
db, err := InitializeTmpDB()
require.NoError(t, err)
defer db.Close()
c1 := Contact{
Name: "first",
Type: ContactPublicRoom,
}
require.NoError(t, db.SaveContacts([]Contact{c1}))
h := History{
Synced: 100,
Contact: c1,
}
require.NoError(t, db.UpdateHistories([]History{h}))
histories, err := db.Histories()
require.NoError(t, err)
require.Len(t, histories, 1)
require.Equal(t, h.Synced, histories[0].Synced)
}
func BenchmarkLoadMessages(b *testing.B) {
db, err := InitializeTmpDB()
require.NoError(b, err)

View File

@ -0,0 +1,53 @@
package client
import (
"time"
"github.com/status-im/status-console-client/protocol/v1"
)
// History is used to track when contact was synced last time.
// Contact extension. Deleted on cascade when parent contact is deleted.
type History struct {
// Synced is a timestamp in seconds.
Synced int64
Contact Contact
}
func splitIntoSyncedNotSynced(histories []History) (sync []History, nosync []History) {
for i := range histories {
if histories[i].Synced != 0 {
sync = append(sync, histories[i])
} else {
nosync = append(nosync, histories[i])
}
}
return
}
func syncedToOpts(histories []History, now time.Time) protocol.RequestOptions {
opts := protocol.RequestOptions{
To: now.Unix(),
Limit: 1000,
}
for i := range histories {
if opts.From == 0 || opts.From > histories[i].Synced {
opts.From = histories[i].Synced
}
// TODO(dshulyak) remove contact type validation in that function
// simply always add topic and (if set) public key
_ = enhanceRequestOptions(histories[i].Contact, &opts)
}
return opts
}
func notsyncedToOpts(histories []History, now time.Time) protocol.RequestOptions {
opts := protocol.DefaultRequestOptions()
opts.To = now.Unix()
for i := range histories {
// TODO(dshulyak) remove contact type validation in that function
// simply always add topic and (if set) public key
_ = enhanceRequestOptions(histories[i].Contact, &opts)
}
return opts
}

View File

@ -0,0 +1,76 @@
package client
import (
"context"
"sort"
"testing"
"time"
"github.com/status-im/status-console-client/protocol/v1"
"github.com/stretchr/testify/require"
)
type requestsMock struct {
requests []protocol.RequestOptions
}
func (proto *requestsMock) Send(ctx context.Context, data []byte, options protocol.SendOptions) ([]byte, error) {
return nil, nil
}
func (proto *requestsMock) Subscribe(ctx context.Context, messages chan<- *protocol.Message, options protocol.SubscribeOptions) (*protocol.Subscription, error) {
return nil, nil
}
func (proto *requestsMock) Request(ctx context.Context, params protocol.RequestOptions) error {
proto.requests = append(proto.requests, params)
return nil
}
func TestRequestHistoryOneRequest(t *testing.T) {
db, err := InitializeTmpDB()
require.NoError(t, err)
defer db.Close()
proto := &requestsMock{}
m := NewMessengerV2(nil, proto, db)
require.NoError(t, db.SaveContacts([]Contact{
{Name: "first", Type: ContactPublicRoom},
{Name: "second", Type: ContactPublicRoom}}))
require.NoError(t, m.RequestAll(context.TODO(), true))
require.Len(t, proto.requests, 1)
histories, err := db.Histories()
require.NoError(t, err)
require.Len(t, histories, 2)
require.Equal(t, histories[0].Synced, proto.requests[0].To)
require.Equal(t, histories[1].Synced, proto.requests[0].To)
}
func TestRequestHistoryTwoRequest(t *testing.T) {
db, err := InitializeTmpDB()
require.NoError(t, err)
defer db.Close()
proto := &requestsMock{}
m := NewMessengerV2(nil, proto, db)
contacts := []Contact{
{Name: "first", Type: ContactPublicRoom, Topic: "first"},
{Name: "second", Type: ContactPublicRoom, Topic: "second"},
{Name: "third", Type: ContactPublicRoom, Topic: "third"},
}
require.NoError(t, db.SaveContacts(contacts))
histories := []History{{Synced: time.Now().Add(-time.Hour).Unix(), Contact: contacts[0]}}
require.NoError(t, db.UpdateHistories(histories))
require.NoError(t, m.RequestAll(context.TODO(), true))
require.Len(t, proto.requests, 2)
sort.Slice(proto.requests, func(i, j int) bool {
return proto.requests[i].From < proto.requests[j].From
})
require.Len(t, proto.requests[0].Chats, 2)
require.Len(t, proto.requests[1].Chats, 1)
require.Equal(t, histories[0].Contact.Name, proto.requests[1].Chats[0].ChatName)
require.Equal(t, histories[0].Synced, proto.requests[1].From)
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"log"
"sync"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
@ -88,13 +89,6 @@ func (m *MessengerV2) Join(ctx context.Context, c Contact) error {
}
func (m *MessengerV2) joinPrivate(ctx context.Context, c Contact) (err error) {
// FIXME(dshulyak) don't request messages on every join
// all messages must be requested in a single request when app starts
defer func() {
if err == nil {
err = m.Request(ctx, c, protocol.DefaultRequestOptions())
}
}()
_, exist := m.private[c.Topic]
if exist {
return
@ -111,17 +105,19 @@ func (m *MessengerV2) joinPrivate(ctx context.Context, c Contact) (err error) {
return err
}
m.private[c.Name] = stream
return
opts := protocol.DefaultRequestOptions()
err = enhanceRequestOptions(c, &opts)
if err != nil {
return err
}
err = m.Request(ctx, c, opts)
if err == nil {
err = m.db.UpdateHistories([]History{{Contact: c, Synced: opts.To}})
}
return err
}
func (m *MessengerV2) joinPublic(ctx context.Context, c Contact) (err error) {
// FIXME(dshulyak) don't request messages on every join
// all messages must be requested in a single request when app starts
defer func() {
if err == nil {
err = m.Request(ctx, c, protocol.DefaultRequestOptions())
}
}()
_, exist := m.public[c.Topic]
if exist {
// FIXME(dshulyak) don't request messages on every join
@ -140,7 +136,16 @@ func (m *MessengerV2) joinPublic(ctx context.Context, c Contact) (err error) {
return
}
m.public[c.Name] = stream
return
opts := protocol.DefaultRequestOptions()
err = enhanceRequestOptions(c, &opts)
if err != nil {
return err
}
err = m.Request(ctx, c, opts)
if err == nil {
err = m.db.UpdateHistories([]History{{Contact: c, Synced: opts.To}})
}
return err
}
// Messages reads all messages from database.
@ -156,19 +161,56 @@ func (m *MessengerV2) Request(ctx context.Context, c Contact, options protocol.R
return m.proto.Request(ctx, options)
}
func (m *MessengerV2) requestHistories(ctx context.Context, histories []History, opts protocol.RequestOptions) error {
log.Printf("[messenger::RequestAll] requesting messages for chats %+v: from %d to %d\n", opts.Chats, opts.From, opts.To)
start := time.Now()
err := m.proto.Request(ctx, opts)
if err != nil {
return err
}
log.Printf("[messenger::RequestAll] requesting message for chats %+v finished. took %v\n", opts.Chats, time.Since(start))
for i := range histories {
histories[i].Synced = opts.To
}
err = m.db.UpdateHistories(histories)
return err
}
func (m *MessengerV2) RequestAll(ctx context.Context, newest bool) error {
contacts, err := m.db.Contacts()
// FIXME(dshulyak) if newest is false request 24 hour of messages older then the
// earliest envelope for each contact.
histories, err := m.db.Histories()
if err != nil {
return errors.Wrap(err, "error fetching contacts")
}
requestParams := protocol.DefaultRequestOptions()
for _, c := range contacts {
err = enhanceRequestOptions(c, &requestParams)
var (
now = time.Now()
synced, notsynced = splitIntoSyncedNotSynced(histories)
errors = make(chan error, 2)
wg sync.WaitGroup
)
if len(synced) != 0 {
wg.Add(1)
go func() {
errors <- m.requestHistories(ctx, synced, syncedToOpts(synced, now))
wg.Done()
}()
}
if len(notsynced) != 0 {
wg.Add(1)
go func() {
errors <- m.requestHistories(ctx, notsynced, notsyncedToOpts(notsynced, now))
wg.Done()
}()
}
wg.Wait()
close(errors)
for err := range errors {
if err != nil {
return err
}
}
return m.proto.Request(ctx, requestParams)
return nil
}
func (m *MessengerV2) Send(c Contact, data []byte) error {

View File

@ -1,2 +1,3 @@
DROP TABLE user_messages;
DROP TABLE user_contacts;
DROP TABLE history_user_contact_topic;

View File

@ -21,3 +21,9 @@ type INT NOT NULL,
state INT,
public_key BLOB
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS history_user_contact_topic (
synced BIGINT DEFAULT 0 NOT NULL,
contact_id VARCHAR UNIQUE NOT NULL,
FOREIGN KEY(contact_id) REFERENCES user_contacts(id) ON DELETE CASCADE
);

View File

@ -25,7 +25,7 @@ func bindata_read(data []byte, name string) ([]byte, error) {
return buf.Bytes(), nil
}
var __0001_add_messages_contacts_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xe3\x7e\xc7\x78\x34\x00\x00\x00")
var __0001_add_messages_contacts_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x41\x95\xc9\xc8\x2c\x2e\xc9\x2f\xaa\x8c\x47\x56\x11\x5f\x92\x5f\x90\x99\x6c\xcd\x05\x08\x00\x00\xff\xff\x1b\x57\x14\x62\x5b\x00\x00\x00")
func _0001_add_messages_contacts_down_db_sql() ([]byte, error) {
return bindata_read(
@ -34,7 +34,7 @@ func _0001_add_messages_contacts_down_db_sql() ([]byte, error) {
)
}
var __0001_add_messages_contacts_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xcd\x4e\xc3\x30\x10\x84\xef\x7e\x8a\x3d\x16\x29\x6f\xd0\x53\xd2\x1a\xba\x22\xd8\xe0\x3a\x34\x3d\x45\xc6\xb5\xc0\x6a\xf3\x23\xbc\x95\xe8\xdb\x23\x2c\x87\x84\xaa\x5c\x67\x77\x67\x67\xbe\x95\xe2\xb9\xe6\xa0\xf3\xa2\xe4\x80\xf7\x20\xa4\x06\x5e\xe3\x56\x6f\xe1\x1c\xdc\x67\xd3\xba\x10\xcc\xbb\x0b\xb0\x60\xfe\x00\x45\x29\x0b\xa8\x04\xbe\x54\x3c\x6e\x8a\xaa\x2c\x33\x66\xfb\x8e\x8c\xa5\xc6\x1f\xe0\x35\x57\xab\x4d\xae\xae\x86\xae\xa3\x86\x2e\x83\x1b\xc7\x19\x4b\xb6\x57\x2a\xb9\x2f\x02\xcd\x6b\x9d\x31\x7b\xea\xed\x11\x0a\x7c\x40\xa1\x33\x46\xbe\x75\x81\x4c\x3b\xfc\x2a\xa3\xad\xfd\x30\xf1\x71\xba\x1a\x9f\x4d\x46\xc3\xf9\xed\xe4\x6d\x73\x74\x97\x98\x9e\xdd\x2d\x19\x4b\xa5\x51\xac\x79\x0d\x53\xfa\x00\x52\xfc\x6d\xbd\x98\x86\xb3\xbb\x7f\x61\xa5\xed\x04\x6b\x64\xf1\xac\xf0\x29\x57\x7b\x78\xe4\xfb\x19\x97\xce\xb4\xee\x06\x2e\xea\x07\x6f\x63\xf4\xb9\xf8\x43\x09\xc5\x5c\x0a\x64\x28\x6a\x37\x1a\xc2\x0e\xf5\x46\x56\x1a\x94\xdc\xe1\x7a\xc9\xbe\x03\x00\x00\xff\xff\x70\xc1\xe7\x30\xe4\x01\x00\x00")
var __0001_add_messages_contacts_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x7c\x92\xcf\x6e\xb3\x30\x10\xc4\xef\x7e\x8a\x3d\xf2\x49\x1c\xbe\x7b\x4f\x06\x96\xc4\x2a\xb5\x5b\xc7\x34\xc9\x09\x51\xc7\x6a\x50\xc2\x1f\xc5\x8e\x54\xde\xbe\x82\x42\xe3\xa6\x69\xae\x33\xf6\x7a\xe6\xb7\x8e\x25\x52\x85\xa0\x68\x94\x21\xb0\x14\xb8\x50\x80\x1b\xb6\x52\x2b\x38\x5b\x73\x2a\x6a\x63\x6d\xf9\x6e\x2c\x04\xa4\xda\x41\x94\x89\x08\x72\xce\x5e\x72\x1c\x4f\xf2\x3c\xcb\x42\xa2\xdb\xc6\x95\xda\x15\xd5\x0e\x5e\xa9\x8c\x97\x54\x5e\x99\xa6\x71\x85\xeb\x3b\x33\xdb\x21\x99\xc6\x5e\xa9\xce\x7c\x38\x50\xb8\x51\x21\xd1\xc7\x56\x1f\x20\x62\x0b\xc6\x55\x48\x5c\x55\x1b\xeb\xca\xba\xfb\x56\xe6\xb1\x7a\x5f\x8e\x0f\x4f\xb7\xe6\xc7\x2e\x83\xba\xf3\xdb\xb1\xd2\xc5\xc1\xf4\x63\x7a\xf2\xef\x81\x90\xa9\x34\xe3\x09\x6e\xe0\x92\xde\x82\xe0\x3f\x5b\x07\x17\xd3\xbb\xf7\x27\xac\xe9\xf4\x04\x6b\x66\xf1\x2c\xd9\x13\x95\x5b\x78\xc4\xad\xc7\xa5\x29\x6b\x73\x03\x97\x6b\xbb\x4a\x8f\xd1\x7d\x71\xa0\xc4\xb8\x2f\x59\x57\xba\x51\xbb\xd1\x10\xd6\x4c\x2d\x45\xae\x40\x8a\x35\x4b\xee\xe7\xde\x57\xd6\xb5\xa7\xbe\xf0\xf3\x17\x5f\x21\x02\x62\xfb\x46\x9b\xdd\xc4\x1c\x12\x4c\x69\x9e\x29\xf8\x7f\x7f\xf5\xbf\xbe\x47\x2a\x24\xb2\x05\x1f\xfa\xfb\x3c\x41\x62\x8a\x12\x79\x8c\x57\xf4\x82\xc1\x14\x1c\x12\xcc\x50\x21\xc4\x74\x15\xd3\x04\x87\xc5\x7d\x06\x00\x00\xff\xff\x28\x6d\x42\x61\xad\x02\x00\x00")
func _0001_add_messages_contacts_up_db_sql() ([]byte, error) {
return bindata_read(