Add topics to contact (#37)

This commit is contained in:
Dmitry Shulyak 2019-05-15 14:00:04 +03:00 committed by GitHub
parent 22a2ee2d02
commit bb6cf0b2a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 196 additions and 320 deletions

View File

@ -61,7 +61,7 @@ func TestSendMessage(t *testing.T) {
messenger := client.NewMessengerV2(identity, &chatMock, db) messenger := client.NewMessengerV2(identity, &chatMock, db)
vc := NewChatViewController(nil, nil, &messenger, nil) vc := NewChatViewController(nil, nil, &messenger, nil)
err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom}) err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom, Topic: chatName})
require.NoError(t, err) require.NoError(t, err)
// close reading loops // close reading loops
defer close(vc.cancel) defer close(vc.cancel)

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"github.com/status-im/status-console-client/protocol/adapters" "github.com/status-im/status-console-client/protocol/adapters"
"github.com/status-im/status-console-client/protocol/client"
) )
var ( var (
@ -21,12 +22,12 @@ func main() {
log.Println("flags:", *publicTopic, *privateTopic, *output) log.Println("flags:", *publicTopic, *privateTopic, *output)
if *publicTopic != "" { if *publicTopic != "" {
topic, err := adapters.PublicChatTopic(*publicTopic) topic, err := adapters.ToTopic(*publicTopic)
exitErr(err) exitErr(err)
printOutput(topic) printOutput(topic)
} else if *privateTopic { } else if *privateTopic {
topic, err := adapters.PrivateChatTopic() topic, err := adapters.ToTopic(client.DefaultPrivateTopic())
exitErr(err) exitErr(err)
printOutput(topic) printOutput(topic)

View File

@ -69,9 +69,11 @@ func bytesToArgs(b []byte) []string {
func contactAddCmdHandler(args []string) (c client.Contact, err error) { func contactAddCmdHandler(args []string) (c client.Contact, err error) {
if len(args) == 1 { if len(args) == 1 {
name := args[0]
c = client.Contact{ c = client.Contact{
Name: args[0], Name: name,
Type: client.ContactPublicRoom, Type: client.ContactPublicRoom,
Topic: name,
} }
} else if len(args) == 2 { } else if len(args) == 2 {
c, err = client.ContactWithPublicKey(args[1], args[0]) c, err = client.ContactWithPublicKey(args[1], args[0])
@ -94,6 +96,7 @@ func ContactCmdFactory(c *ContactsViewController) CmdHandler {
if err != nil { if err != nil {
return err return err
} }
log.Printf("adding contact with topic %s\n", contact.Topic)
if err := c.Add(contact); err != nil { if err := c.Add(contact); err != nil {
return err return err
} }

View File

@ -129,19 +129,21 @@ func main() {
if contacts, err := db.Contacts(); len(contacts) == 0 || err != nil { if contacts, err := db.Contacts(); len(contacts) == 0 || err != nil {
debugContacts := []client.Contact{ debugContacts := []client.Contact{
{Name: "status", Type: client.ContactPublicRoom}, {Name: "status", Type: client.ContactPublicRoom, Topic: "status"},
{Name: "status-core", Type: client.ContactPublicRoom}, {Name: "status-core", Type: client.ContactPublicRoom, Topic: "status-core"},
{Name: "testing-adamb", Type: client.ContactPublicRoom}, {Name: "testing-adamb", Type: client.ContactPublicRoom, Topic: "testing-adamb"},
adambContact, adambContact,
} }
if err := db.SaveContacts(debugContacts); err != nil { if err := db.SaveContacts(debugContacts); err != nil {
exitErr(err) exitErr(err)
} }
} }
go func() {
err = messenger.Start() err = messenger.Start()
if err != nil { if err != nil {
exitErr(err) exitErr(err)
} }
}()
if !*noUI { if !*noUI {
if err := setupGUI(privateKey, messenger); err != nil { if err := setupGUI(privateKey, messenger); err != nil {

View File

@ -260,7 +260,7 @@ func (c *criteria) ToWhisper() whisper.Criteria {
} }
func (c *criteria) updateForPublicGroup(name string) error { func (c *criteria) updateForPublicGroup(name string) error {
topic, err := PublicChatTopic(name) topic, err := ToTopic(name)
if err != nil { if err != nil {
return err return err
} }
@ -275,8 +275,8 @@ func (c *criteria) updateForPublicGroup(name string) error {
return nil return nil
} }
func (c *criteria) updateForPrivate(recipient *ecdsa.PublicKey) error { func (c *criteria) updateForPrivate(name string, recipient *ecdsa.PublicKey) error {
topic, err := PrivateChatTopic() topic, err := ToTopic(name)
if err != nil { if err != nil {
return err return err
} }
@ -292,8 +292,8 @@ func (c *criteria) updateForPrivate(recipient *ecdsa.PublicKey) error {
} }
func updateCriteriaFromSubscribeOptions(c *criteria, options protocol.SubscribeOptions) error { func updateCriteriaFromSubscribeOptions(c *criteria, options protocol.SubscribeOptions) error {
if options.Recipient != nil { if options.Recipient != nil && options.ChatName != "" {
return c.updateForPrivate(options.Recipient) return c.updateForPrivate(options.ChatName, options.Recipient)
} else if options.ChatName != "" { } else if options.ChatName != "" {
return c.updateForPublicGroup(options.ChatName) return c.updateForPublicGroup(options.ChatName)
} else { } else {

View File

@ -36,8 +36,8 @@ func (m *newMessage) ToWhisper() whisper.NewMessage {
return m.NewMessage return m.NewMessage
} }
func (m *newMessage) updateForPrivate(recipient *ecdsa.PublicKey) (err error) { func (m *newMessage) updateForPrivate(name string, recipient *ecdsa.PublicKey) (err error) {
m.Topic, err = PrivateChatTopic() m.Topic, err = ToTopic(name)
if err != nil { if err != nil {
return return
} }
@ -48,7 +48,7 @@ func (m *newMessage) updateForPrivate(recipient *ecdsa.PublicKey) (err error) {
} }
func (m *newMessage) updateForPublicGroup(name string) (err error) { func (m *newMessage) updateForPublicGroup(name string) (err error) {
m.Topic, err = PublicChatTopic(name) m.Topic, err = ToTopic(name)
if err != nil { if err != nil {
return return
} }
@ -58,8 +58,8 @@ func (m *newMessage) updateForPublicGroup(name string) (err error) {
} }
func updateNewMessageFromSendOptions(m *newMessage, options protocol.SendOptions) error { func updateNewMessageFromSendOptions(m *newMessage, options protocol.SendOptions) error {
if options.Recipient != nil { if options.Recipient != nil && options.ChatName != "" {
return m.updateForPrivate(options.Recipient) return m.updateForPrivate(options.ChatName, options.Recipient)
} else if options.ChatName != "" { } else if options.ChatName != "" {
return m.updateForPublicGroup(options.ChatName) return m.updateForPublicGroup(options.ChatName)
} else { } else {

View File

@ -402,7 +402,7 @@ func (f *filter) ToWhisper() *whisper.Filter {
} }
func (f *filter) updateForPublicGroup(name string) error { func (f *filter) updateForPublicGroup(name string) error {
topic, err := PublicChatTopic(name) topic, err := ToTopic(name)
if err != nil { if err != nil {
return err return err
} }
@ -421,8 +421,8 @@ func (f *filter) updateForPublicGroup(name string) error {
return nil return nil
} }
func (f *filter) updateForPrivate(recipient *ecdsa.PublicKey) error { func (f *filter) updateForPrivate(name string, recipient *ecdsa.PublicKey) error {
topic, err := PrivateChatTopic() topic, err := ToTopic(name)
if err != nil { if err != nil {
return err return err
} }
@ -434,8 +434,8 @@ func (f *filter) updateForPrivate(recipient *ecdsa.PublicKey) error {
} }
func updateFilterFromSubscribeOptions(f *filter, options protocol.SubscribeOptions) error { func updateFilterFromSubscribeOptions(f *filter, options protocol.SubscribeOptions) error {
if options.Recipient != nil { if options.Recipient != nil && options.ChatName != "" {
return f.updateForPrivate(options.Recipient) return f.updateForPrivate(options.ChatName, options.Recipient)
} else if options.ChatName != "" { } else if options.ChatName != "" {
return f.updateForPublicGroup(options.ChatName) return f.updateForPublicGroup(options.ChatName)
} else { } else {

View File

@ -1,23 +0,0 @@
package adapters
import (
whisper "github.com/status-im/whisper/whisperv6"
"golang.org/x/crypto/sha3"
)
// PublicChatTopic returns a Whisper topic for a public channel name.
func PublicChatTopic(name string) (whisper.TopicType, error) {
hash := sha3.NewLegacyKeccak256()
if _, err := hash.Write([]byte(name)); err != nil {
return whisper.TopicType{}, err
}
return whisper.BytesToTopic(hash.Sum(nil)), nil
}
// PrivateChatTopic returns a Whisper topic for a private chat.
// FIXME(dshulyak) TopicDiscovery is selected by an application not protocol.
// Move it one layer higher.
func PrivateChatTopic() (whisper.TopicType, error) {
return PublicChatTopic(TopicDiscovery)
}

View File

@ -6,6 +6,7 @@ import (
"github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-console-client/protocol/v1"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
whisper "github.com/status-im/whisper/whisperv6" whisper "github.com/status-im/whisper/whisperv6"
"golang.org/x/crypto/sha3"
) )
func createShhextRequestMessagesParam(enode, mailSymKeyID string, options protocol.RequestOptions) (shhext.MessagesRequest, error) { func createShhextRequestMessagesParam(enode, mailSymKeyID string, options protocol.RequestOptions) (shhext.MessagesRequest, error) {
@ -29,13 +30,18 @@ func createShhextRequestMessagesParam(enode, mailSymKeyID string, options protoc
} }
func topicForChatOptions(options protocol.ChatOptions) (whisper.TopicType, error) { func topicForChatOptions(options protocol.ChatOptions) (whisper.TopicType, error) {
if options.Recipient != nil {
return PrivateChatTopic()
}
if options.ChatName != "" { if options.ChatName != "" {
return PublicChatTopic(options.ChatName) return ToTopic(options.ChatName)
} }
return whisper.TopicType{}, errors.New("invalid options") return whisper.TopicType{}, errors.New("invalid options")
} }
// ToTopic returns a Whisper topic for a channel name.
func ToTopic(name string) (whisper.TopicType, error) {
hash := sha3.NewLegacyKeccak256()
if _, err := hash.Write([]byte(name)); err != nil {
return whisper.TopicType{}, err
}
return whisper.BytesToTopic(hash.Sum(nil)), nil
}

View File

@ -0,0 +1,9 @@
package client
const (
TopicDiscovery = "contact-discovery"
)
func DefaultPrivateTopic() string {
return TopicDiscovery
}

View File

@ -52,6 +52,7 @@ type Contact struct {
Name string `json:"name"` Name string `json:"name"`
Type ContactType `json:"type"` Type ContactType `json:"type"`
State ContactState `json:"state"` State ContactState `json:"state"`
Topic string `json:"topic"`
PublicKey *ecdsa.PublicKey `json:"-"` PublicKey *ecdsa.PublicKey `json:"-"`
} }
@ -110,7 +111,7 @@ func (c *Contact) UnmarshalJSON(data []byte) error {
func ContactWithPublicKey(name, pubKeyHex string) (c Contact, err error) { func ContactWithPublicKey(name, pubKeyHex string) (c Contact, err error) {
c.Name = name c.Name = name
c.Type = ContactPublicKey c.Type = ContactPublicKey
c.Topic = DefaultPrivateTopic()
pubKeyBytes, err := hexutil.Decode(pubKeyHex) pubKeyBytes, err := hexutil.Decode(pubKeyHex)
if err != nil { if err != nil {
return return
@ -119,13 +120,3 @@ func ContactWithPublicKey(name, pubKeyHex string) (c Contact, err error) {
c.PublicKey, err = crypto.UnmarshalPubkey(pubKeyBytes) c.PublicKey, err = crypto.UnmarshalPubkey(pubKeyBytes)
return return
} }
// ContainsContact check if a slice contains a given contact.
func ContainsContact(cs []Contact, c Contact) bool {
for _, item := range cs {
if item == c {
return true
}
}
return false
}

View File

@ -31,7 +31,7 @@ func TestContactMarshalUnmarshal(t *testing.T) {
Name: "status", Name: "status",
Type: ContactPublicRoom, Type: ContactPublicRoom,
}, },
result: `{"name":"status","type":"ContactPublicRoom","state":0}`, result: `{"name":"status","type":"ContactPublicRoom","state":0,"topic":""}`,
}, },
{ {
name: "ContactPublicKey", name: "ContactPublicKey",
@ -40,7 +40,7 @@ func TestContactMarshalUnmarshal(t *testing.T) {
Type: ContactPublicKey, Type: ContactPublicKey,
PublicKey: &privateKey.PublicKey, PublicKey: &privateKey.PublicKey,
}, },
result: fmt.Sprintf(`{"name":"user1","type":"ContactPublicKey","state":0,"public_key":"%s"}`, publicKeyStr), result: fmt.Sprintf(`{"name":"user1","type":"ContactPublicKey","state":0,"topic":"","public_key":"%s"}`, publicKeyStr),
}, },
} }

View File

@ -150,7 +150,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) {
if err != nil { if err != nil {
return err return err
} }
stmt, err = tx.Prepare("INSERT OR REPLACE INTO user_contacts(id, name, type, state, public_key) VALUES (?, ?, ?, ?, ?)") stmt, err = tx.Prepare("INSERT OR REPLACE INTO user_contacts(id, name, type, state, topic, public_key) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil { if err != nil {
return err return err
} }
@ -175,7 +175,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) {
pkey := append([]byte{}, buf.Bytes()...) pkey := append([]byte{}, buf.Bytes()...)
buf.Reset() buf.Reset()
id := fmt.Sprintf("%s:%d", contacts[i].Name, contacts[i].Type) id := fmt.Sprintf("%s:%d", contacts[i].Name, contacts[i].Type)
_, err = stmt.Exec(id, contacts[i].Name, contacts[i].Type, contacts[i].State, pkey) _, err = stmt.Exec(id, contacts[i].Name, contacts[i].Type, contacts[i].State, contacts[i].Topic, pkey)
if err != nil { if err != nil {
return err return err
} }
@ -185,7 +185,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) {
// Contacts returns all available contacts. // Contacts returns all available contacts.
func (db SQLLiteDatabase) Contacts() ([]Contact, error) { func (db SQLLiteDatabase) Contacts() ([]Contact, error) {
rows, err := db.db.Query("SELECT name, type, state, public_key FROM user_contacts") rows, err := db.db.Query("SELECT name, type, state, topic, public_key FROM user_contacts")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -200,7 +200,7 @@ func (db SQLLiteDatabase) Contacts() ([]Contact, error) {
dec := gob.NewDecoder(&buf) dec := gob.NewDecoder(&buf)
contact := Contact{} contact := Contact{}
pkey := []byte{} pkey := []byte{}
err = rows.Scan(&contact.Name, &contact.Type, &contact.State, &pkey) err = rows.Scan(&contact.Name, &contact.Type, &contact.State, &contact.Topic, &pkey)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,6 +20,7 @@ func TestContactReplacedBySameName(t *testing.T) {
Name: "first", Name: "first",
Type: ContactPublicRoom, Type: ContactPublicRoom,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "first",
} }
require.NoError(t, db.SaveContacts([]Contact{contact})) require.NoError(t, db.SaveContacts([]Contact{contact}))
require.NoError(t, db.SaveContacts([]Contact{contact})) require.NoError(t, db.SaveContacts([]Contact{contact}))
@ -42,6 +43,7 @@ func TestMessagesFilteredAndOrderedByTimestamp(t *testing.T) {
Name: "test", Name: "test",
Type: ContactPublicRoom, Type: ContactPublicRoom,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "first",
} }
require.NoError(t, db.SaveContacts([]Contact{contact})) require.NoError(t, db.SaveContacts([]Contact{contact}))
contacts, err := db.Contacts() contacts, err := db.Contacts()
@ -77,6 +79,7 @@ func TestSaveMessagesUniqueConstraint(t *testing.T) {
contact := Contact{ contact := Contact{
Name: "test", Name: "test",
Type: ContactPublicRoom, Type: ContactPublicRoom,
Topic: "first",
} }
sameid := []byte("1") sameid := []byte("1")
msg1 := protocol.Message{ msg1 := protocol.Message{
@ -109,6 +112,7 @@ func TestGetLastMessageClock(t *testing.T) {
contact := Contact{ contact := Contact{
Name: "test", Name: "test",
Type: ContactPublicRoom, Type: ContactPublicRoom,
Topic: "first",
} }
_, err = db.SaveMessages(contact, messages) _, err = db.SaveMessages(contact, messages)
require.NoError(t, err) require.NoError(t, err)
@ -127,6 +131,7 @@ func TestPublicContactExist(t *testing.T) {
Name: "first", Name: "first",
Type: ContactPublicKey, Type: ContactPublicKey,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "first",
} }
require.NoError(t, db.SaveContacts([]Contact{contact})) require.NoError(t, db.SaveContacts([]Contact{contact}))
exists, err := db.PublicContactExist(contact) exists, err := db.PublicContactExist(contact)
@ -145,16 +150,19 @@ func BenchmarkLoadMessages(b *testing.B) {
Name: "first", Name: "first",
Type: ContactPublicKey, Type: ContactPublicKey,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "test",
}, },
{ {
Name: "second", Name: "second",
Type: ContactPublicKey, Type: ContactPublicKey,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "test",
}, },
{ {
Name: "third", Name: "third",
Type: ContactPublicKey, Type: ContactPublicKey,
PublicKey: &pk.PublicKey, PublicKey: &pk.PublicKey,
Topic: "test",
}, },
} }
count := 10000 count := 10000

View File

@ -19,6 +19,7 @@ func NewMessengerV2(identity *ecdsa.PrivateKey, proto protocol.Protocol, db Data
db: NewDatabaseWithEvents(db, feed), db: NewDatabaseWithEvents(db, feed),
public: map[string]AsyncStream{}, public: map[string]AsyncStream{},
private: map[string]AsyncStream{},
// FIXME(dshulyak) add sufficient buffer to this channel // FIXME(dshulyak) add sufficient buffer to this channel
// it may block stream that receives messages // it may block stream that receives messages
events: feed, events: feed,
@ -32,22 +33,11 @@ type MessengerV2 struct {
mu sync.Mutex mu sync.Mutex
public map[string]AsyncStream public map[string]AsyncStream
private AsyncStream private map[string]AsyncStream
events chan interface{} events chan interface{}
} }
func NewMessanger(identity *ecdsa.PrivateKey, db Database, proto protocol.Protocol) MessengerV2 {
return MessengerV2{
identity: identity,
db: db,
proto: proto,
public: map[string]AsyncStream{},
events: make(chan interface{}),
}
}
func (m *MessengerV2) Start() error { func (m *MessengerV2) Start() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -55,29 +45,35 @@ func (m *MessengerV2) Start() error {
if err != nil { if err != nil {
return errors.Wrap(err, "unable to read contacts from database") return errors.Wrap(err, "unable to read contacts from database")
} }
private := []Contact{}
for i := range contacts { for i := range contacts {
if contacts[i].Type == ContactPublicKey { options, err := createSubscribeOptions(contacts[i])
private = append(private, contacts[i])
} else {
stream := NewStream(context.Background(), contacts[i], m.proto, NewPublicHandler(contacts[i], m.db))
err := stream.Start()
if err != nil { if err != nil {
return errors.Wrap(err, "unable to start stream") return err
} }
m.public[contacts[i].Name] = stream if contacts[i].Type == ContactPublicKey {
_, exist := m.private[contacts[i].Topic]
if exist {
continue
} }
} stream := NewStream(context.Background(), options, m.proto, NewPrivateHandler(m.db))
// FIXME(dshulyak) even if we have no private contacts we still should start a stream for private messages.
// this requires moving topic one level higher, from whisper adapter to the client
if len(private) != 0 {
any := private[0]
stream := NewStream(context.Background(), any, m.proto, NewPrivateHandler(private, m.db))
err := stream.Start() err := stream.Start()
if err != nil { if err != nil {
return errors.Wrap(err, "unable to start private stream") return errors.Wrap(err, "unable to start private stream")
} }
m.private = stream m.private[contacts[i].Topic] = stream
} else {
_, exist := m.public[contacts[i].Topic]
if exist {
return fmt.Errorf("multiple public chats with same topic: %s", contacts[i].Topic)
}
stream := NewStream(context.Background(), options, m.proto, NewPublicHandler(contacts[i], m.db))
err := stream.Start()
if err != nil {
return errors.Wrap(err, "unable to start stream")
}
m.public[contacts[i].Topic] = stream
}
} }
log.Printf("[INFO] request messages from mail sever") log.Printf("[INFO] request messages from mail sever")
return m.RequestAll(context.Background(), true) return m.RequestAll(context.Background(), true)
@ -86,25 +82,66 @@ func (m *MessengerV2) Start() error {
func (m *MessengerV2) Join(ctx context.Context, c Contact) error { func (m *MessengerV2) Join(ctx context.Context, c Contact) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
err := m.db.SaveContacts([]Contact{c}) if c.Type == ContactPublicRoom {
if err != nil { return m.joinPublic(ctx, c)
return errors.Wrap(err, "can't add contact to db")
} }
_, exist := m.public[c.Name] return m.joinPrivate(ctx, c)
if c.Type == ContactPublicKey || exist { }
func (m *MessengerV2) joinPrivate(ctx context.Context, c Contact) (err error) {
// FIXME(dshulyak) don't request messages on every join // FIXME(dshulyak) don't request messages on every join
// all messages must be requested in a single request when app starts // all messages must be requested in a single request when app starts
return m.Request(ctx, c, protocol.DefaultRequestOptions()) defer func() {
if err == nil {
err = m.Request(ctx, c, protocol.DefaultRequestOptions())
} }
log.Printf("[INFO] created stream for contact %s\n", c) }()
stream := NewStream(context.Background(), c, m.proto, NewPublicHandler(c, m.db)) _, exist := m.private[c.Topic]
if exist {
return
}
var options protocol.SubscribeOptions
options, err = createSubscribeOptions(c)
if err != nil {
return err
}
stream := NewStream(context.Background(), options, m.proto, NewPrivateHandler(m.db))
err = stream.Start() err = stream.Start()
if err != nil { if err != nil {
return errors.Wrap(err, "can't subscribe to a stream") err = errors.Wrap(err, "can't subscribe to a stream")
return err
}
m.private[c.Name] = stream
return
}
func (m *MessengerV2) joinPublic(ctx context.Context, c Contact) (err error) {
// FIXME(dshulyak) don't request messages on every join
// all messages must be requested in a single request when app starts
defer func() {
if err == nil {
err = m.Request(ctx, c, protocol.DefaultRequestOptions())
}
}()
_, exist := m.public[c.Topic]
if exist {
// FIXME(dshulyak) don't request messages on every join
// all messages must be requested in a single request when app starts
return
}
var options protocol.SubscribeOptions
options, err = createSubscribeOptions(c)
if err != nil {
return err
}
stream := NewStream(context.Background(), options, m.proto, NewPublicHandler(c, m.db))
err = stream.Start()
if err != nil {
err = errors.Wrap(err, "can't subscribe to a stream")
return
} }
m.public[c.Name] = stream m.public[c.Name] = stream
log.Printf("[INFO] made request for new messages contact %s\n", c) return
return m.Request(ctx, c, protocol.DefaultRequestOptions())
} }
// Messages reads all messages from database. // Messages reads all messages from database.

View File

@ -16,6 +16,7 @@ CREATE INDEX contact_ids ON user_messages(contact_id);
CREATE TABLE IF NOT EXISTS user_contacts ( CREATE TABLE IF NOT EXISTS user_contacts (
id VARCHAR PRIMARY KEY NOT NULL, id VARCHAR PRIMARY KEY NOT NULL,
name VARCHAR NOT NULL, name VARCHAR NOT NULL,
topic TEXT NOT NULL,
type INT NOT NULL, type INT NOT NULL,
state INT, state INT,
public_key BLOB public_key BLOB

View File

@ -1,9 +1,3 @@
// Code generated by go-bindata.
// sources:
// 0001_add_messages_contacts.down.db.sql
// 0001_add_messages_contacts.up.db.sql
// DO NOT EDIT!
package migrations package migrations
import ( import (
@ -11,14 +5,10 @@ import (
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os"
"path/filepath"
"strings" "strings"
"time"
) )
func bindataRead(data []byte, name string) ([]byte, error) { func bindata_read(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data)) gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil { if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err) return nil, fmt.Errorf("Read %q: %v", name, err)
@ -26,130 +16,44 @@ func bindataRead(data []byte, name string) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
_, err = io.Copy(&buf, gz) _, err = io.Copy(&buf, gz)
clErr := gz.Close() gz.Close()
if err != nil { if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err) return nil, fmt.Errorf("Read %q: %v", name, err)
} }
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil return buf.Bytes(), nil
} }
type asset struct { var __0001_add_messages_contacts_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xe3\x7e\xc7\x78\x34\x00\x00\x00")
bytes []byte
info os.FileInfo
}
type bindataFileInfo struct { func _0001_add_messages_contacts_down_db_sql() ([]byte, error) {
name string return bindata_read(
size int64 __0001_add_messages_contacts_down_db_sql,
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __0001_add_messages_contactsDownDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xe3\x7e\xc7\x78\x34\x00\x00\x00")
func _0001_add_messages_contactsDownDbSqlBytes() ([]byte, error) {
return bindataRead(
__0001_add_messages_contactsDownDbSql,
"0001_add_messages_contacts.down.db.sql", "0001_add_messages_contacts.down.db.sql",
) )
} }
func _0001_add_messages_contactsDownDbSql() (*asset, error) { var __0001_add_messages_contacts_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xcd\x4e\xc3\x30\x10\x84\xef\x7e\x8a\x3d\x16\x29\x6f\xd0\x53\xd2\x1a\xba\x22\xd8\xe0\x3a\x34\x3d\x45\xc6\xac\x20\x6a\xf3\x23\xbc\x95\xe8\xdb\x23\xa2\x84\xb8\x55\xb9\xce\xce\xce\xee\x7c\x2b\x23\x53\x2b\xc1\xa6\x59\x2e\x01\xef\x41\x69\x0b\xb2\xc4\xad\xdd\xc2\x29\xd0\x57\xd5\x50\x08\xee\x83\x02\x2c\x44\xfd\x0e\xaf\xa9\x59\x6d\x52\x03\x85\xc2\x97\x42\x0e\x66\x55\xe4\x79\x22\x7c\xd7\xb2\xf3\x5c\x45\x9e\xcb\x21\xb5\x5c\xf1\xb9\xa7\x69\x9c\x88\x31\xf9\x4a\x65\xfa\x66\xb0\xb2\xb4\x89\xf0\xc7\xce\x1f\x20\xc3\x07\x54\x36\x11\x5c\x37\x14\xd8\x35\xfd\x9f\x32\xc5\xfa\x4f\x37\x1c\x1e\xb7\xa6\x63\x73\x50\x7f\x7a\x3b\xd6\xbe\x3a\xd0\x19\xb2\x5c\x67\xe2\x6e\x29\xc4\xd8\x1b\xd5\x5a\x96\x30\x7f\x1f\x40\xab\xcb\xe2\x8b\x79\x18\xed\xfd\xcb\x6b\x74\x5f\xf1\x7a\x36\xf8\x94\x9a\x3d\x3c\xca\x7d\xc4\xa5\x75\x0d\xdd\xc0\xc5\x5d\x5f\xfb\xe1\xf5\x58\xfc\xa5\x84\x2a\x96\x02\x3b\x1e\xb4\x1b\x0d\x61\x87\x76\xa3\x0b\x0b\x46\xef\x70\xbd\x14\x3f\x01\x00\x00\xff\xff\xb4\xd5\xa9\x88\xe7\x01\x00\x00")
bytes, err := _0001_add_messages_contactsDownDbSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "0001_add_messages_contacts.down.db.sql", size: 52, mode: os.FileMode(436), modTime: time.Unix(1557300707, 0)} func _0001_add_messages_contacts_up_db_sql() ([]byte, error) {
a := &asset{bytes: bytes, info: info} return bindata_read(
return a, nil __0001_add_messages_contacts_up_db_sql,
}
var __0001_add_messages_contactsUpDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xc1\x4e\xc3\x30\x10\x44\xef\xfe\x8a\x3d\x16\x29\x7f\xd0\x93\xd3\x1a\xba\x22\xd8\xe0\x6e\x68\x7a\x8a\x8c\xb1\x20\x6a\x93\x46\x78\x2b\xd1\xbf\x47\x8a\x12\x92\x56\xe9\x75\x66\x3c\xeb\x79\x2b\xab\x24\x29\x20\x99\x66\x0a\xf0\x11\xb4\x21\x50\x05\x6e\x69\x0b\xe7\x18\x7e\xca\x3a\xc4\xe8\xbe\x42\x84\x85\xa8\x3e\xe1\x5d\xda\xd5\x46\x5a\xc8\x35\xbe\xe5\xaa\x0b\xeb\x3c\xcb\x12\xe1\x4f\x0d\x3b\xcf\xe5\x24\x73\x6d\x86\x86\x4b\xbe\xb4\x61\xb0\x13\xd1\x37\xdf\xa8\x1c\x7e\x19\x48\x15\x94\x08\x7f\x3c\xf9\x03\xa4\xf8\x84\x9a\x12\xc1\x55\x1d\x22\xbb\xba\xfd\x57\x86\x5a\xff\xed\xba\xc3\xfd\xab\xe1\xd8\x58\xd4\x9e\x3f\x8e\x95\x2f\x0f\xe1\x02\x69\x66\x52\xf1\xb0\x14\xa2\xdf\x8d\x7a\xad\x0a\x18\x7f\x1f\xc1\xe8\xeb\xe1\x8b\xd1\x9c\xbc\xbb\xcb\xab\x4f\xdf\xf0\x7a\xb5\xf8\x22\xed\x1e\x9e\xd5\x7e\xc2\xa5\x71\x75\x98\xc1\xd5\x01\x41\x4d\x13\x29\xb2\xe3\x4e\x9b\x19\x03\x3b\xa4\x8d\xc9\x09\xac\xd9\xe1\x7a\x29\xfe\x02\x00\x00\xff\xff\xe9\x0b\x8c\x5e\xd2\x01\x00\x00")
func _0001_add_messages_contactsUpDbSqlBytes() ([]byte, error) {
return bindataRead(
__0001_add_messages_contactsUpDbSql,
"0001_add_messages_contacts.up.db.sql", "0001_add_messages_contacts.up.db.sql",
) )
} }
func _0001_add_messages_contactsUpDbSql() (*asset, error) {
bytes, err := _0001_add_messages_contactsUpDbSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "0001_add_messages_contacts.up.db.sql", size: 466, mode: os.FileMode(436), modTime: time.Unix(1557742893, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
// Asset loads and returns the asset for the given name. // Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or // It returns an error if the asset could not be found or
// could not be loaded. // could not be loaded.
func Asset(name string) ([]byte, error) { func Asset(name string) ([]byte, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1) cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok { if f, ok := _bindata[cannonicalName]; ok {
a, err := f() return f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
} }
return nil, fmt.Errorf("Asset %s not found", name) return nil, fmt.Errorf("Asset %s not found", name)
} }
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetNames returns the names of the assets. // AssetNames returns the names of the assets.
func AssetNames() []string { func AssetNames() []string {
names := make([]string, 0, len(_bindata)) names := make([]string, 0, len(_bindata))
@ -160,11 +64,10 @@ func AssetNames() []string {
} }
// _bindata is a table, holding each asset generator, mapped to its name. // _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){ var _bindata = map[string]func() ([]byte, error){
"0001_add_messages_contacts.down.db.sql": _0001_add_messages_contactsDownDbSql, "0001_add_messages_contacts.down.db.sql": _0001_add_messages_contacts_down_db_sql,
"0001_add_messages_contacts.up.db.sql": _0001_add_messages_contactsUpDbSql, "0001_add_messages_contacts.up.db.sql": _0001_add_messages_contacts_up_db_sql,
} }
// AssetDir returns the file names below a certain // AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata. // directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the // For example if you run go-bindata on data/... and data contains the
@ -194,65 +97,19 @@ func AssetDir(name string) ([]string, error) {
return nil, fmt.Errorf("Asset %s not found", name) return nil, fmt.Errorf("Asset %s not found", name)
} }
rv := make([]string, 0, len(node.Children)) rv := make([]string, 0, len(node.Children))
for childName := range node.Children { for name := range node.Children {
rv = append(rv, childName) rv = append(rv, name)
} }
return rv, nil return rv, nil
} }
type bintree struct { type _bintree_t struct {
Func func() (*asset, error) Func func() ([]byte, error)
Children map[string]*bintree Children map[string]*_bintree_t
} }
var _bintree = &bintree{nil, map[string]*bintree{ var _bintree = &_bintree_t{nil, map[string]*_bintree_t{
"0001_add_messages_contacts.down.db.sql": &bintree{_0001_add_messages_contactsDownDbSql, map[string]*bintree{}}, "0001_add_messages_contacts.down.db.sql": &_bintree_t{_0001_add_messages_contacts_down_db_sql, map[string]*_bintree_t{
"0001_add_messages_contacts.up.db.sql": &bintree{_0001_add_messages_contactsUpDbSql, map[string]*bintree{}}, }},
"0001_add_messages_contacts.up.db.sql": &_bintree_t{_0001_add_messages_contacts_up_db_sql, map[string]*_bintree_t{
}},
}} }}
// RestoreAsset restores an asset under the given directory
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
if err != nil {
return err
}
return nil
}
// RestoreAssets restores an asset under the given directory recursively
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
cannonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...)
}

View File

@ -11,9 +11,9 @@ var (
) )
func createSubscribeOptions(c Contact) (opts protocol.SubscribeOptions, err error) { func createSubscribeOptions(c Contact) (opts protocol.SubscribeOptions, err error) {
opts.ChatName = c.Topic
switch c.Type { switch c.Type {
case ContactPublicRoom: case ContactPublicRoom:
opts.ChatName = c.Name
case ContactPublicKey: case ContactPublicKey:
opts.Recipient = c.PublicKey opts.Recipient = c.PublicKey
default: default:
@ -23,9 +23,9 @@ func createSubscribeOptions(c Contact) (opts protocol.SubscribeOptions, err erro
} }
func createSendOptions(c Contact) (opts protocol.SendOptions, err error) { func createSendOptions(c Contact) (opts protocol.SendOptions, err error) {
opts.ChatName = c.Topic
switch c.Type { switch c.Type {
case ContactPublicRoom: case ContactPublicRoom:
opts.ChatName = c.Name
case ContactPublicKey: case ContactPublicKey:
opts.Recipient = c.PublicKey opts.Recipient = c.PublicKey
default: default:
@ -36,10 +36,9 @@ func createSendOptions(c Contact) (opts protocol.SendOptions, err error) {
func enhanceRequestOptions(c Contact, opts *protocol.RequestOptions) error { func enhanceRequestOptions(c Contact, opts *protocol.RequestOptions) error {
var chatOptions protocol.ChatOptions var chatOptions protocol.ChatOptions
chatOptions.ChatName = c.Topic
switch c.Type { switch c.Type {
case ContactPublicRoom: case ContactPublicRoom:
chatOptions.ChatName = c.Name
case ContactPublicKey: case ContactPublicKey:
chatOptions.Recipient = c.PublicKey chatOptions.Recipient = c.PublicKey
default: default:

View File

@ -43,13 +43,8 @@ func (pub PublicStream) Handle(msg protocol.Message) error {
return nil return nil
} }
func NewPrivateHandler(contacts []Contact, db Database) StreamHandler { func NewPrivateHandler(db Database) StreamHandler {
keyed := map[string]Contact{}
for i := range contacts {
keyed[PubkeyToHex(contacts[i].PublicKey)] = contacts[i]
}
return PrivateStream{ return PrivateStream{
contacts: keyed,
db: db, db: db,
}.Handle }.Handle
} }
@ -58,7 +53,6 @@ func NewPrivateHandler(contacts []Contact, db Database) StreamHandler {
// In our case every message will have a pubkey (derived from signature) that will be used // In our case every message will have a pubkey (derived from signature) that will be used
// to determine who is the writer // to determine who is the writer
type PrivateStream struct { type PrivateStream struct {
contacts map[string]Contact // key is a hex from public key
db Database db Database
} }
@ -67,13 +61,12 @@ func (priv PrivateStream) Handle(msg protocol.Message) error {
return errors.New("message should be signed") return errors.New("message should be signed")
} }
keyhex := PubkeyToHex(msg.SigPubKey) keyhex := PubkeyToHex(msg.SigPubKey)
// FIXME(dshulyak) Check if contact exist in database
// preferably don't marshal key as a blob
contact := Contact{ contact := Contact{
Type: ContactPublicKey, Type: ContactPublicKey,
State: ContactNew, State: ContactNew,
Name: keyhex, // TODO(dshulyak) replace with 3-word funny name Name: keyhex, // TODO(dshulyak) replace with 3-word funny name
PublicKey: msg.SigPubKey, PublicKey: msg.SigPubKey,
Topic: DefaultPrivateTopic(),
} }
exist, err := priv.db.PublicContactExist(contact) exist, err := priv.db.PublicContactExist(contact)
if err != nil { if err != nil {
@ -101,8 +94,7 @@ type AsyncStream interface {
} }
type Stream struct { type Stream struct {
// TODO replace contact with a topic. content from single stream can be delivered to multiple contacts. options protocol.SubscribeOptions
contact Contact
proto protocol.Protocol proto protocol.Protocol
handler StreamHandler handler StreamHandler
@ -112,9 +104,9 @@ type Stream struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewStream(ctx context.Context, contact Contact, proto protocol.Protocol, handler StreamHandler) *Stream { func NewStream(ctx context.Context, options protocol.SubscribeOptions, proto protocol.Protocol, handler StreamHandler) *Stream {
return &Stream{ return &Stream{
contact: contact, options: options,
proto: proto, proto: proto,
handler: handler, handler: handler,
parent: ctx, parent: ctx,
@ -128,11 +120,7 @@ func (stream *Stream) Start() error {
ctx, cancel := context.WithCancel(stream.parent) ctx, cancel := context.WithCancel(stream.parent)
stream.cancel = cancel stream.cancel = cancel
msgs := make(chan *protocol.Message, 100) msgs := make(chan *protocol.Message, 100)
opts, err := createSubscribeOptions(stream.contact) sub, err := stream.proto.Subscribe(ctx, msgs, stream.options)
if err != nil {
return errors.Wrap(err, "failed to create subscribe options")
}
sub, err := stream.proto.Subscribe(ctx, msgs, opts)
if err != nil { if err != nil {
stream.cancel = nil stream.cancel = nil
return err return err

View File

@ -31,7 +31,7 @@ func TestPrivateStreamSavesNewContactsAndMessages(t *testing.T) {
pkey, err := crypto.GenerateKey() pkey, err := crypto.GenerateKey()
require.NoError(t, err) require.NoError(t, err)
private := NewPrivateHandler([]Contact{}, db) private := NewPrivateHandler(db)
msg := protocol.Message{ msg := protocol.Message{
ID: []byte{1}, ID: []byte{1},
SigPubKey: &pkey.PublicKey, SigPubKey: &pkey.PublicKey,

View File

@ -31,9 +31,6 @@ func (o ChatOptions) Validate() error {
if o == (ChatOptions{}) { if o == (ChatOptions{}) {
return errors.New("empty options") return errors.New("empty options")
} }
if o.ChatName != "" && o.Recipient != nil {
return errors.New("field ChatName and Recipient both set")
}
return nil return nil
} }