From bb6cf0b2a9784b85aa0f5e5a43577de083d94938 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 15 May 2019 14:00:04 +0300 Subject: [PATCH] Add topics to contact (#37) --- chat_test.go | 2 +- cmd/protocol/main.go | 5 +- input.go | 23 ++- main.go | 16 +- protocol/adapters/whisper_client.go | 10 +- protocol/adapters/whisper_message.go | 10 +- protocol/adapters/whisper_service.go | 10 +- protocol/adapters/whisper_topic.go | 23 --- protocol/adapters/whisper_utils.go | 16 +- protocol/client/constants.go | 9 + protocol/client/contact.go | 13 +- protocol/client/contact_test.go | 4 +- protocol/client/database.go | 8 +- protocol/client/database_test.go | 16 +- protocol/client/messenger_v2.go | 119 +++++++---- .../0001_add_messages_contacts.up.db.sql | 1 + protocol/client/migrations/bindata.go | 191 +++--------------- protocol/client/options.go | 7 +- protocol/client/stream.go | 28 +-- protocol/client/stream_test.go | 2 +- protocol/v1/protocol.go | 3 - 21 files changed, 196 insertions(+), 320 deletions(-) delete mode 100644 protocol/adapters/whisper_topic.go create mode 100644 protocol/client/constants.go diff --git a/chat_test.go b/chat_test.go index 5212b05..f118005 100644 --- a/chat_test.go +++ b/chat_test.go @@ -61,7 +61,7 @@ func TestSendMessage(t *testing.T) { messenger := client.NewMessengerV2(identity, &chatMock, db) vc := NewChatViewController(nil, nil, &messenger, nil) - err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom}) + err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom, Topic: chatName}) require.NoError(t, err) // close reading loops defer close(vc.cancel) diff --git a/cmd/protocol/main.go b/cmd/protocol/main.go index 1c0d830..e14b475 100644 --- a/cmd/protocol/main.go +++ b/cmd/protocol/main.go @@ -7,6 +7,7 @@ import ( "os" "github.com/status-im/status-console-client/protocol/adapters" + "github.com/status-im/status-console-client/protocol/client" ) var ( @@ -21,12 +22,12 @@ func main() { log.Println("flags:", *publicTopic, *privateTopic, *output) if *publicTopic != "" { - topic, err := adapters.PublicChatTopic(*publicTopic) + topic, err := adapters.ToTopic(*publicTopic) exitErr(err) printOutput(topic) } else if *privateTopic { - topic, err := adapters.PrivateChatTopic() + topic, err := adapters.ToTopic(client.DefaultPrivateTopic()) exitErr(err) printOutput(topic) diff --git a/input.go b/input.go index ab3e356..d73c79d 100644 --- a/input.go +++ b/input.go @@ -69,9 +69,11 @@ func bytesToArgs(b []byte) []string { func contactAddCmdHandler(args []string) (c client.Contact, err error) { if len(args) == 1 { + name := args[0] c = client.Contact{ - Name: args[0], - Type: client.ContactPublicRoom, + Name: name, + Type: client.ContactPublicRoom, + Topic: name, } } else if len(args) == 2 { c, err = client.ContactWithPublicKey(args[1], args[0]) @@ -94,19 +96,20 @@ func ContactCmdFactory(c *ContactsViewController) CmdHandler { if err != nil { return err } + log.Printf("adding contact with topic %s\n", contact.Topic) if err := c.Add(contact); err != nil { return err } // TODO: fix removing contacts // case "remove": - // if len(args) == 2 { - // if err := c.Remove(args[1]); err != nil { - // return err - // } - // c.Refresh() - // return nil - // } - // return errors.New("/contact: incorect arguments to remove subcommand") + // if len(args) == 2 { + // if err := c.Remove(args[1]); err != nil { + // return err + // } + // c.Refresh() + // return nil + // } + // return errors.New("/contact: incorect arguments to remove subcommand") } return nil diff --git a/main.go b/main.go index bbb46f7..1374901 100644 --- a/main.go +++ b/main.go @@ -129,19 +129,21 @@ func main() { if contacts, err := db.Contacts(); len(contacts) == 0 || err != nil { debugContacts := []client.Contact{ - {Name: "status", Type: client.ContactPublicRoom}, - {Name: "status-core", Type: client.ContactPublicRoom}, - {Name: "testing-adamb", Type: client.ContactPublicRoom}, + {Name: "status", Type: client.ContactPublicRoom, Topic: "status"}, + {Name: "status-core", Type: client.ContactPublicRoom, Topic: "status-core"}, + {Name: "testing-adamb", Type: client.ContactPublicRoom, Topic: "testing-adamb"}, adambContact, } if err := db.SaveContacts(debugContacts); err != nil { exitErr(err) } } - err = messenger.Start() - if err != nil { - exitErr(err) - } + go func() { + err = messenger.Start() + if err != nil { + exitErr(err) + } + }() if !*noUI { if err := setupGUI(privateKey, messenger); err != nil { diff --git a/protocol/adapters/whisper_client.go b/protocol/adapters/whisper_client.go index af98a35..605f903 100644 --- a/protocol/adapters/whisper_client.go +++ b/protocol/adapters/whisper_client.go @@ -260,7 +260,7 @@ func (c *criteria) ToWhisper() whisper.Criteria { } func (c *criteria) updateForPublicGroup(name string) error { - topic, err := PublicChatTopic(name) + topic, err := ToTopic(name) if err != nil { return err } @@ -275,8 +275,8 @@ func (c *criteria) updateForPublicGroup(name string) error { return nil } -func (c *criteria) updateForPrivate(recipient *ecdsa.PublicKey) error { - topic, err := PrivateChatTopic() +func (c *criteria) updateForPrivate(name string, recipient *ecdsa.PublicKey) error { + topic, err := ToTopic(name) if err != nil { return err } @@ -292,8 +292,8 @@ func (c *criteria) updateForPrivate(recipient *ecdsa.PublicKey) error { } func updateCriteriaFromSubscribeOptions(c *criteria, options protocol.SubscribeOptions) error { - if options.Recipient != nil { - return c.updateForPrivate(options.Recipient) + if options.Recipient != nil && options.ChatName != "" { + return c.updateForPrivate(options.ChatName, options.Recipient) } else if options.ChatName != "" { return c.updateForPublicGroup(options.ChatName) } else { diff --git a/protocol/adapters/whisper_message.go b/protocol/adapters/whisper_message.go index 8abe9ae..780f1e1 100644 --- a/protocol/adapters/whisper_message.go +++ b/protocol/adapters/whisper_message.go @@ -36,8 +36,8 @@ func (m *newMessage) ToWhisper() whisper.NewMessage { return m.NewMessage } -func (m *newMessage) updateForPrivate(recipient *ecdsa.PublicKey) (err error) { - m.Topic, err = PrivateChatTopic() +func (m *newMessage) updateForPrivate(name string, recipient *ecdsa.PublicKey) (err error) { + m.Topic, err = ToTopic(name) if err != nil { return } @@ -48,7 +48,7 @@ func (m *newMessage) updateForPrivate(recipient *ecdsa.PublicKey) (err error) { } func (m *newMessage) updateForPublicGroup(name string) (err error) { - m.Topic, err = PublicChatTopic(name) + m.Topic, err = ToTopic(name) if err != nil { return } @@ -58,8 +58,8 @@ func (m *newMessage) updateForPublicGroup(name string) (err error) { } func updateNewMessageFromSendOptions(m *newMessage, options protocol.SendOptions) error { - if options.Recipient != nil { - return m.updateForPrivate(options.Recipient) + if options.Recipient != nil && options.ChatName != "" { + return m.updateForPrivate(options.ChatName, options.Recipient) } else if options.ChatName != "" { return m.updateForPublicGroup(options.ChatName) } else { diff --git a/protocol/adapters/whisper_service.go b/protocol/adapters/whisper_service.go index 32871d7..e415b72 100644 --- a/protocol/adapters/whisper_service.go +++ b/protocol/adapters/whisper_service.go @@ -402,7 +402,7 @@ func (f *filter) ToWhisper() *whisper.Filter { } func (f *filter) updateForPublicGroup(name string) error { - topic, err := PublicChatTopic(name) + topic, err := ToTopic(name) if err != nil { return err } @@ -421,8 +421,8 @@ func (f *filter) updateForPublicGroup(name string) error { return nil } -func (f *filter) updateForPrivate(recipient *ecdsa.PublicKey) error { - topic, err := PrivateChatTopic() +func (f *filter) updateForPrivate(name string, recipient *ecdsa.PublicKey) error { + topic, err := ToTopic(name) if err != nil { return err } @@ -434,8 +434,8 @@ func (f *filter) updateForPrivate(recipient *ecdsa.PublicKey) error { } func updateFilterFromSubscribeOptions(f *filter, options protocol.SubscribeOptions) error { - if options.Recipient != nil { - return f.updateForPrivate(options.Recipient) + if options.Recipient != nil && options.ChatName != "" { + return f.updateForPrivate(options.ChatName, options.Recipient) } else if options.ChatName != "" { return f.updateForPublicGroup(options.ChatName) } else { diff --git a/protocol/adapters/whisper_topic.go b/protocol/adapters/whisper_topic.go deleted file mode 100644 index 68b946a..0000000 --- a/protocol/adapters/whisper_topic.go +++ /dev/null @@ -1,23 +0,0 @@ -package adapters - -import ( - whisper "github.com/status-im/whisper/whisperv6" - "golang.org/x/crypto/sha3" -) - -// PublicChatTopic returns a Whisper topic for a public channel name. -func PublicChatTopic(name string) (whisper.TopicType, error) { - hash := sha3.NewLegacyKeccak256() - if _, err := hash.Write([]byte(name)); err != nil { - return whisper.TopicType{}, err - } - - return whisper.BytesToTopic(hash.Sum(nil)), nil -} - -// PrivateChatTopic returns a Whisper topic for a private chat. -// FIXME(dshulyak) TopicDiscovery is selected by an application not protocol. -// Move it one layer higher. -func PrivateChatTopic() (whisper.TopicType, error) { - return PublicChatTopic(TopicDiscovery) -} diff --git a/protocol/adapters/whisper_utils.go b/protocol/adapters/whisper_utils.go index 2e5b1b6..003356c 100644 --- a/protocol/adapters/whisper_utils.go +++ b/protocol/adapters/whisper_utils.go @@ -6,6 +6,7 @@ import ( "github.com/status-im/status-console-client/protocol/v1" "github.com/status-im/status-go/services/shhext" whisper "github.com/status-im/whisper/whisperv6" + "golang.org/x/crypto/sha3" ) func createShhextRequestMessagesParam(enode, mailSymKeyID string, options protocol.RequestOptions) (shhext.MessagesRequest, error) { @@ -29,13 +30,18 @@ func createShhextRequestMessagesParam(enode, mailSymKeyID string, options protoc } func topicForChatOptions(options protocol.ChatOptions) (whisper.TopicType, error) { - if options.Recipient != nil { - return PrivateChatTopic() - } - if options.ChatName != "" { - return PublicChatTopic(options.ChatName) + return ToTopic(options.ChatName) } return whisper.TopicType{}, errors.New("invalid options") } + +// ToTopic returns a Whisper topic for a channel name. +func ToTopic(name string) (whisper.TopicType, error) { + hash := sha3.NewLegacyKeccak256() + if _, err := hash.Write([]byte(name)); err != nil { + return whisper.TopicType{}, err + } + return whisper.BytesToTopic(hash.Sum(nil)), nil +} diff --git a/protocol/client/constants.go b/protocol/client/constants.go new file mode 100644 index 0000000..04d24f8 --- /dev/null +++ b/protocol/client/constants.go @@ -0,0 +1,9 @@ +package client + +const ( + TopicDiscovery = "contact-discovery" +) + +func DefaultPrivateTopic() string { + return TopicDiscovery +} diff --git a/protocol/client/contact.go b/protocol/client/contact.go index b19161d..6fb2c62 100644 --- a/protocol/client/contact.go +++ b/protocol/client/contact.go @@ -52,6 +52,7 @@ type Contact struct { Name string `json:"name"` Type ContactType `json:"type"` State ContactState `json:"state"` + Topic string `json:"topic"` PublicKey *ecdsa.PublicKey `json:"-"` } @@ -110,7 +111,7 @@ func (c *Contact) UnmarshalJSON(data []byte) error { func ContactWithPublicKey(name, pubKeyHex string) (c Contact, err error) { c.Name = name c.Type = ContactPublicKey - + c.Topic = DefaultPrivateTopic() pubKeyBytes, err := hexutil.Decode(pubKeyHex) if err != nil { return @@ -119,13 +120,3 @@ func ContactWithPublicKey(name, pubKeyHex string) (c Contact, err error) { c.PublicKey, err = crypto.UnmarshalPubkey(pubKeyBytes) return } - -// ContainsContact check if a slice contains a given contact. -func ContainsContact(cs []Contact, c Contact) bool { - for _, item := range cs { - if item == c { - return true - } - } - return false -} diff --git a/protocol/client/contact_test.go b/protocol/client/contact_test.go index 689b0e7..c4442c1 100644 --- a/protocol/client/contact_test.go +++ b/protocol/client/contact_test.go @@ -31,7 +31,7 @@ func TestContactMarshalUnmarshal(t *testing.T) { Name: "status", Type: ContactPublicRoom, }, - result: `{"name":"status","type":"ContactPublicRoom","state":0}`, + result: `{"name":"status","type":"ContactPublicRoom","state":0,"topic":""}`, }, { name: "ContactPublicKey", @@ -40,7 +40,7 @@ func TestContactMarshalUnmarshal(t *testing.T) { Type: ContactPublicKey, PublicKey: &privateKey.PublicKey, }, - result: fmt.Sprintf(`{"name":"user1","type":"ContactPublicKey","state":0,"public_key":"%s"}`, publicKeyStr), + result: fmt.Sprintf(`{"name":"user1","type":"ContactPublicKey","state":0,"topic":"","public_key":"%s"}`, publicKeyStr), }, } diff --git a/protocol/client/database.go b/protocol/client/database.go index f7b8672..774d22a 100644 --- a/protocol/client/database.go +++ b/protocol/client/database.go @@ -150,7 +150,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) { if err != nil { return err } - stmt, err = tx.Prepare("INSERT OR REPLACE INTO user_contacts(id, name, type, state, public_key) VALUES (?, ?, ?, ?, ?)") + stmt, err = tx.Prepare("INSERT OR REPLACE INTO user_contacts(id, name, type, state, topic, public_key) VALUES (?, ?, ?, ?, ?, ?)") if err != nil { return err } @@ -175,7 +175,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) { pkey := append([]byte{}, buf.Bytes()...) buf.Reset() id := fmt.Sprintf("%s:%d", contacts[i].Name, contacts[i].Type) - _, err = stmt.Exec(id, contacts[i].Name, contacts[i].Type, contacts[i].State, pkey) + _, err = stmt.Exec(id, contacts[i].Name, contacts[i].Type, contacts[i].State, contacts[i].Topic, pkey) if err != nil { return err } @@ -185,7 +185,7 @@ func (db SQLLiteDatabase) SaveContacts(contacts []Contact) (err error) { // Contacts returns all available contacts. func (db SQLLiteDatabase) Contacts() ([]Contact, error) { - rows, err := db.db.Query("SELECT name, type, state, public_key FROM user_contacts") + rows, err := db.db.Query("SELECT name, type, state, topic, public_key FROM user_contacts") if err != nil { return nil, err } @@ -200,7 +200,7 @@ func (db SQLLiteDatabase) Contacts() ([]Contact, error) { dec := gob.NewDecoder(&buf) contact := Contact{} pkey := []byte{} - err = rows.Scan(&contact.Name, &contact.Type, &contact.State, &pkey) + err = rows.Scan(&contact.Name, &contact.Type, &contact.State, &contact.Topic, &pkey) if err != nil { return nil, err } diff --git a/protocol/client/database_test.go b/protocol/client/database_test.go index 0d3b034..f7515d6 100644 --- a/protocol/client/database_test.go +++ b/protocol/client/database_test.go @@ -20,6 +20,7 @@ func TestContactReplacedBySameName(t *testing.T) { Name: "first", Type: ContactPublicRoom, PublicKey: &pk.PublicKey, + Topic: "first", } require.NoError(t, db.SaveContacts([]Contact{contact})) require.NoError(t, db.SaveContacts([]Contact{contact})) @@ -42,6 +43,7 @@ func TestMessagesFilteredAndOrderedByTimestamp(t *testing.T) { Name: "test", Type: ContactPublicRoom, PublicKey: &pk.PublicKey, + Topic: "first", } require.NoError(t, db.SaveContacts([]Contact{contact})) contacts, err := db.Contacts() @@ -75,8 +77,9 @@ func TestMessagesFilteredAndOrderedByTimestamp(t *testing.T) { func TestSaveMessagesUniqueConstraint(t *testing.T) { contact := Contact{ - Name: "test", - Type: ContactPublicRoom, + Name: "test", + Type: ContactPublicRoom, + Topic: "first", } sameid := []byte("1") msg1 := protocol.Message{ @@ -107,8 +110,9 @@ func TestGetLastMessageClock(t *testing.T) { } } contact := Contact{ - Name: "test", - Type: ContactPublicRoom, + Name: "test", + Type: ContactPublicRoom, + Topic: "first", } _, err = db.SaveMessages(contact, messages) require.NoError(t, err) @@ -127,6 +131,7 @@ func TestPublicContactExist(t *testing.T) { Name: "first", Type: ContactPublicKey, PublicKey: &pk.PublicKey, + Topic: "first", } require.NoError(t, db.SaveContacts([]Contact{contact})) exists, err := db.PublicContactExist(contact) @@ -145,16 +150,19 @@ func BenchmarkLoadMessages(b *testing.B) { Name: "first", Type: ContactPublicKey, PublicKey: &pk.PublicKey, + Topic: "test", }, { Name: "second", Type: ContactPublicKey, PublicKey: &pk.PublicKey, + Topic: "test", }, { Name: "third", Type: ContactPublicKey, PublicKey: &pk.PublicKey, + Topic: "test", }, } count := 10000 diff --git a/protocol/client/messenger_v2.go b/protocol/client/messenger_v2.go index 489f9dc..88b778a 100644 --- a/protocol/client/messenger_v2.go +++ b/protocol/client/messenger_v2.go @@ -18,7 +18,8 @@ func NewMessengerV2(identity *ecdsa.PrivateKey, proto protocol.Protocol, db Data proto: proto, db: NewDatabaseWithEvents(db, feed), - public: map[string]AsyncStream{}, + public: map[string]AsyncStream{}, + private: map[string]AsyncStream{}, // FIXME(dshulyak) add sufficient buffer to this channel // it may block stream that receives messages events: feed, @@ -32,22 +33,11 @@ type MessengerV2 struct { mu sync.Mutex public map[string]AsyncStream - private AsyncStream + private map[string]AsyncStream events chan interface{} } -func NewMessanger(identity *ecdsa.PrivateKey, db Database, proto protocol.Protocol) MessengerV2 { - return MessengerV2{ - identity: identity, - db: db, - proto: proto, - - public: map[string]AsyncStream{}, - events: make(chan interface{}), - } -} - func (m *MessengerV2) Start() error { m.mu.Lock() defer m.mu.Unlock() @@ -55,30 +45,36 @@ func (m *MessengerV2) Start() error { if err != nil { return errors.Wrap(err, "unable to read contacts from database") } - private := []Contact{} + for i := range contacts { + options, err := createSubscribeOptions(contacts[i]) + if err != nil { + return err + } if contacts[i].Type == ContactPublicKey { - private = append(private, contacts[i]) + _, exist := m.private[contacts[i].Topic] + if exist { + continue + } + stream := NewStream(context.Background(), options, m.proto, NewPrivateHandler(m.db)) + err := stream.Start() + if err != nil { + return errors.Wrap(err, "unable to start private stream") + } + m.private[contacts[i].Topic] = stream } else { - stream := NewStream(context.Background(), contacts[i], m.proto, NewPublicHandler(contacts[i], m.db)) + _, exist := m.public[contacts[i].Topic] + if exist { + return fmt.Errorf("multiple public chats with same topic: %s", contacts[i].Topic) + } + stream := NewStream(context.Background(), options, m.proto, NewPublicHandler(contacts[i], m.db)) err := stream.Start() if err != nil { return errors.Wrap(err, "unable to start stream") } - m.public[contacts[i].Name] = stream + m.public[contacts[i].Topic] = stream } } - // FIXME(dshulyak) even if we have no private contacts we still should start a stream for private messages. - // this requires moving topic one level higher, from whisper adapter to the client - if len(private) != 0 { - any := private[0] - stream := NewStream(context.Background(), any, m.proto, NewPrivateHandler(private, m.db)) - err := stream.Start() - if err != nil { - return errors.Wrap(err, "unable to start private stream") - } - m.private = stream - } log.Printf("[INFO] request messages from mail sever") return m.RequestAll(context.Background(), true) } @@ -86,25 +82,66 @@ func (m *MessengerV2) Start() error { func (m *MessengerV2) Join(ctx context.Context, c Contact) error { m.mu.Lock() defer m.mu.Unlock() - err := m.db.SaveContacts([]Contact{c}) + if c.Type == ContactPublicRoom { + return m.joinPublic(ctx, c) + } + return m.joinPrivate(ctx, c) +} + +func (m *MessengerV2) joinPrivate(ctx context.Context, c Contact) (err error) { + // FIXME(dshulyak) don't request messages on every join + // all messages must be requested in a single request when app starts + defer func() { + if err == nil { + err = m.Request(ctx, c, protocol.DefaultRequestOptions()) + } + }() + _, exist := m.private[c.Topic] + if exist { + return + } + var options protocol.SubscribeOptions + options, err = createSubscribeOptions(c) if err != nil { - return errors.Wrap(err, "can't add contact to db") + return err } - _, exist := m.public[c.Name] - if c.Type == ContactPublicKey || exist { - // FIXME(dshulyak) don't request messages on every join - // all messages must be requested in a single request when app starts - return m.Request(ctx, c, protocol.DefaultRequestOptions()) - } - log.Printf("[INFO] created stream for contact %s\n", c) - stream := NewStream(context.Background(), c, m.proto, NewPublicHandler(c, m.db)) + stream := NewStream(context.Background(), options, m.proto, NewPrivateHandler(m.db)) err = stream.Start() if err != nil { - return errors.Wrap(err, "can't subscribe to a stream") + err = errors.Wrap(err, "can't subscribe to a stream") + return err + } + m.private[c.Name] = stream + return +} + +func (m *MessengerV2) joinPublic(ctx context.Context, c Contact) (err error) { + // FIXME(dshulyak) don't request messages on every join + // all messages must be requested in a single request when app starts + defer func() { + if err == nil { + err = m.Request(ctx, c, protocol.DefaultRequestOptions()) + } + }() + _, exist := m.public[c.Topic] + if exist { + // FIXME(dshulyak) don't request messages on every join + // all messages must be requested in a single request when app starts + return + } + var options protocol.SubscribeOptions + options, err = createSubscribeOptions(c) + if err != nil { + return err + } + stream := NewStream(context.Background(), options, m.proto, NewPublicHandler(c, m.db)) + err = stream.Start() + if err != nil { + err = errors.Wrap(err, "can't subscribe to a stream") + return } m.public[c.Name] = stream - log.Printf("[INFO] made request for new messages contact %s\n", c) - return m.Request(ctx, c, protocol.DefaultRequestOptions()) + return } // Messages reads all messages from database. diff --git a/protocol/client/migrations/0001_add_messages_contacts.up.db.sql b/protocol/client/migrations/0001_add_messages_contacts.up.db.sql index a13515c..17d2bc4 100644 --- a/protocol/client/migrations/0001_add_messages_contacts.up.db.sql +++ b/protocol/client/migrations/0001_add_messages_contacts.up.db.sql @@ -16,6 +16,7 @@ CREATE INDEX contact_ids ON user_messages(contact_id); CREATE TABLE IF NOT EXISTS user_contacts ( id VARCHAR PRIMARY KEY NOT NULL, name VARCHAR NOT NULL, +topic TEXT NOT NULL, type INT NOT NULL, state INT, public_key BLOB diff --git a/protocol/client/migrations/bindata.go b/protocol/client/migrations/bindata.go index b4daf85..b946717 100644 --- a/protocol/client/migrations/bindata.go +++ b/protocol/client/migrations/bindata.go @@ -1,9 +1,3 @@ -// Code generated by go-bindata. -// sources: -// 0001_add_messages_contacts.down.db.sql -// 0001_add_messages_contacts.up.db.sql -// DO NOT EDIT! - package migrations import ( @@ -11,14 +5,10 @@ import ( "compress/gzip" "fmt" "io" - "io/ioutil" - "os" - "path/filepath" "strings" - "time" ) -func bindataRead(data []byte, name string) ([]byte, error) { +func bindata_read(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { return nil, fmt.Errorf("Read %q: %v", name, err) @@ -26,130 +16,44 @@ func bindataRead(data []byte, name string) ([]byte, error) { var buf bytes.Buffer _, err = io.Copy(&buf, gz) - clErr := gz.Close() + gz.Close() if err != nil { return nil, fmt.Errorf("Read %q: %v", name, err) } - if clErr != nil { - return nil, err - } return buf.Bytes(), nil } -type asset struct { - bytes []byte - info os.FileInfo -} +var __0001_add_messages_contacts_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xe3\x7e\xc7\x78\x34\x00\x00\x00") -type bindataFileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time -} - -func (fi bindataFileInfo) Name() string { - return fi.name -} -func (fi bindataFileInfo) Size() int64 { - return fi.size -} -func (fi bindataFileInfo) Mode() os.FileMode { - return fi.mode -} -func (fi bindataFileInfo) ModTime() time.Time { - return fi.modTime -} -func (fi bindataFileInfo) IsDir() bool { - return false -} -func (fi bindataFileInfo) Sys() interface{} { - return nil -} - -var __0001_add_messages_contactsDownDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x42\x97\x49\xce\xcf\x2b\x49\x4c\x2e\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xe3\x7e\xc7\x78\x34\x00\x00\x00") - -func _0001_add_messages_contactsDownDbSqlBytes() ([]byte, error) { - return bindataRead( - __0001_add_messages_contactsDownDbSql, +func _0001_add_messages_contacts_down_db_sql() ([]byte, error) { + return bindata_read( + __0001_add_messages_contacts_down_db_sql, "0001_add_messages_contacts.down.db.sql", ) } -func _0001_add_messages_contactsDownDbSql() (*asset, error) { - bytes, err := _0001_add_messages_contactsDownDbSqlBytes() - if err != nil { - return nil, err - } +var __0001_add_messages_contacts_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xcd\x4e\xc3\x30\x10\x84\xef\x7e\x8a\x3d\x16\x29\x6f\xd0\x53\xd2\x1a\xba\x22\xd8\xe0\x3a\x34\x3d\x45\xc6\xac\x20\x6a\xf3\x23\xbc\x95\xe8\xdb\x23\xa2\x84\xb8\x55\xb9\xce\xce\xce\xee\x7c\x2b\x23\x53\x2b\xc1\xa6\x59\x2e\x01\xef\x41\x69\x0b\xb2\xc4\xad\xdd\xc2\x29\xd0\x57\xd5\x50\x08\xee\x83\x02\x2c\x44\xfd\x0e\xaf\xa9\x59\x6d\x52\x03\x85\xc2\x97\x42\x0e\x66\x55\xe4\x79\x22\x7c\xd7\xb2\xf3\x5c\x45\x9e\xcb\x21\xb5\x5c\xf1\xb9\xa7\x69\x9c\x88\x31\xf9\x4a\x65\xfa\x66\xb0\xb2\xb4\x89\xf0\xc7\xce\x1f\x20\xc3\x07\x54\x36\x11\x5c\x37\x14\xd8\x35\xfd\x9f\x32\xc5\xfa\x4f\x37\x1c\x1e\xb7\xa6\x63\x73\x50\x7f\x7a\x3b\xd6\xbe\x3a\xd0\x19\xb2\x5c\x67\xe2\x6e\x29\xc4\xd8\x1b\xd5\x5a\x96\x30\x7f\x1f\x40\xab\xcb\xe2\x8b\x79\x18\xed\xfd\xcb\x6b\x74\x5f\xf1\x7a\x36\xf8\x94\x9a\x3d\x3c\xca\x7d\xc4\xa5\x75\x0d\xdd\xc0\xc5\x5d\x5f\xfb\xe1\xf5\x58\xfc\xa5\x84\x2a\x96\x02\x3b\x1e\xb4\x1b\x0d\x61\x87\x76\xa3\x0b\x0b\x46\xef\x70\xbd\x14\x3f\x01\x00\x00\xff\xff\xb4\xd5\xa9\x88\xe7\x01\x00\x00") - info := bindataFileInfo{name: "0001_add_messages_contacts.down.db.sql", size: 52, mode: os.FileMode(436), modTime: time.Unix(1557300707, 0)} - a := &asset{bytes: bytes, info: info} - return a, nil -} - -var __0001_add_messages_contactsUpDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xc1\x4e\xc3\x30\x10\x44\xef\xfe\x8a\x3d\x16\x29\x7f\xd0\x93\xd3\x1a\xba\x22\xd8\xe0\x6e\x68\x7a\x8a\x8c\xb1\x20\x6a\x93\x46\x78\x2b\xd1\xbf\x47\x8a\x12\x92\x56\xe9\x75\x66\x3c\xeb\x79\x2b\xab\x24\x29\x20\x99\x66\x0a\xf0\x11\xb4\x21\x50\x05\x6e\x69\x0b\xe7\x18\x7e\xca\x3a\xc4\xe8\xbe\x42\x84\x85\xa8\x3e\xe1\x5d\xda\xd5\x46\x5a\xc8\x35\xbe\xe5\xaa\x0b\xeb\x3c\xcb\x12\xe1\x4f\x0d\x3b\xcf\xe5\x24\x73\x6d\x86\x86\x4b\xbe\xb4\x61\xb0\x13\xd1\x37\xdf\xa8\x1c\x7e\x19\x48\x15\x94\x08\x7f\x3c\xf9\x03\xa4\xf8\x84\x9a\x12\xc1\x55\x1d\x22\xbb\xba\xfd\x57\x86\x5a\xff\xed\xba\xc3\xfd\xab\xe1\xd8\x58\xd4\x9e\x3f\x8e\x95\x2f\x0f\xe1\x02\x69\x66\x52\xf1\xb0\x14\xa2\xdf\x8d\x7a\xad\x0a\x18\x7f\x1f\xc1\xe8\xeb\xe1\x8b\xd1\x9c\xbc\xbb\xcb\xab\x4f\xdf\xf0\x7a\xb5\xf8\x22\xed\x1e\x9e\xd5\x7e\xc2\xa5\x71\x75\x98\xc1\xd5\x01\x41\x4d\x13\x29\xb2\xe3\x4e\x9b\x19\x03\x3b\xa4\x8d\xc9\x09\xac\xd9\xe1\x7a\x29\xfe\x02\x00\x00\xff\xff\xe9\x0b\x8c\x5e\xd2\x01\x00\x00") - -func _0001_add_messages_contactsUpDbSqlBytes() ([]byte, error) { - return bindataRead( - __0001_add_messages_contactsUpDbSql, +func _0001_add_messages_contacts_up_db_sql() ([]byte, error) { + return bindata_read( + __0001_add_messages_contacts_up_db_sql, "0001_add_messages_contacts.up.db.sql", ) } -func _0001_add_messages_contactsUpDbSql() (*asset, error) { - bytes, err := _0001_add_messages_contactsUpDbSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "0001_add_messages_contacts.up.db.sql", size: 466, mode: os.FileMode(436), modTime: time.Unix(1557742893, 0)} - a := &asset{bytes: bytes, info: info} - return a, nil -} - // Asset loads and returns the asset for the given name. // It returns an error if the asset could not be found or // could not be loaded. func Asset(name string) ([]byte, error) { cannonicalName := strings.Replace(name, "\\", "/", -1) if f, ok := _bindata[cannonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) - } - return a.bytes, nil + return f() } return nil, fmt.Errorf("Asset %s not found", name) } -// MustAsset is like Asset but panics when Asset would return an error. -// It simplifies safe initialization of global variables. -func MustAsset(name string) []byte { - a, err := Asset(name) - if err != nil { - panic("asset: Asset(" + name + "): " + err.Error()) - } - - return a -} - -// AssetInfo loads and returns the asset info for the given name. -// It returns an error if the asset could not be found or -// could not be loaded. -func AssetInfo(name string) (os.FileInfo, error) { - cannonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[cannonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) - } - return a.info, nil - } - return nil, fmt.Errorf("AssetInfo %s not found", name) -} - // AssetNames returns the names of the assets. func AssetNames() []string { names := make([]string, 0, len(_bindata)) @@ -160,11 +64,10 @@ func AssetNames() []string { } // _bindata is a table, holding each asset generator, mapped to its name. -var _bindata = map[string]func() (*asset, error){ - "0001_add_messages_contacts.down.db.sql": _0001_add_messages_contactsDownDbSql, - "0001_add_messages_contacts.up.db.sql": _0001_add_messages_contactsUpDbSql, +var _bindata = map[string]func() ([]byte, error){ + "0001_add_messages_contacts.down.db.sql": _0001_add_messages_contacts_down_db_sql, + "0001_add_messages_contacts.up.db.sql": _0001_add_messages_contacts_up_db_sql, } - // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the @@ -194,65 +97,19 @@ func AssetDir(name string) ([]string, error) { return nil, fmt.Errorf("Asset %s not found", name) } rv := make([]string, 0, len(node.Children)) - for childName := range node.Children { - rv = append(rv, childName) + for name := range node.Children { + rv = append(rv, name) } return rv, nil } -type bintree struct { - Func func() (*asset, error) - Children map[string]*bintree +type _bintree_t struct { + Func func() ([]byte, error) + Children map[string]*_bintree_t } -var _bintree = &bintree{nil, map[string]*bintree{ - "0001_add_messages_contacts.down.db.sql": &bintree{_0001_add_messages_contactsDownDbSql, map[string]*bintree{}}, - "0001_add_messages_contacts.up.db.sql": &bintree{_0001_add_messages_contactsUpDbSql, map[string]*bintree{}}, +var _bintree = &_bintree_t{nil, map[string]*_bintree_t{ + "0001_add_messages_contacts.down.db.sql": &_bintree_t{_0001_add_messages_contacts_down_db_sql, map[string]*_bintree_t{ + }}, + "0001_add_messages_contacts.up.db.sql": &_bintree_t{_0001_add_messages_contacts_up_db_sql, map[string]*_bintree_t{ + }}, }} - -// RestoreAsset restores an asset under the given directory -func RestoreAsset(dir, name string) error { - data, err := Asset(name) - if err != nil { - return err - } - info, err := AssetInfo(name) - if err != nil { - return err - } - err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755)) - if err != nil { - return err - } - err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) - if err != nil { - return err - } - err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) - if err != nil { - return err - } - return nil -} - -// RestoreAssets restores an asset under the given directory recursively -func RestoreAssets(dir, name string) error { - children, err := AssetDir(name) - // File - if err != nil { - return RestoreAsset(dir, name) - } - // Dir - for _, child := range children { - err = RestoreAssets(dir, filepath.Join(name, child)) - if err != nil { - return err - } - } - return nil -} - -func _filePath(dir, name string) string { - cannonicalName := strings.Replace(name, "\\", "/", -1) - return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) -} - diff --git a/protocol/client/options.go b/protocol/client/options.go index 3ddbfd3..22f5364 100644 --- a/protocol/client/options.go +++ b/protocol/client/options.go @@ -11,9 +11,9 @@ var ( ) func createSubscribeOptions(c Contact) (opts protocol.SubscribeOptions, err error) { + opts.ChatName = c.Topic switch c.Type { case ContactPublicRoom: - opts.ChatName = c.Name case ContactPublicKey: opts.Recipient = c.PublicKey default: @@ -23,9 +23,9 @@ func createSubscribeOptions(c Contact) (opts protocol.SubscribeOptions, err erro } func createSendOptions(c Contact) (opts protocol.SendOptions, err error) { + opts.ChatName = c.Topic switch c.Type { case ContactPublicRoom: - opts.ChatName = c.Name case ContactPublicKey: opts.Recipient = c.PublicKey default: @@ -36,10 +36,9 @@ func createSendOptions(c Contact) (opts protocol.SendOptions, err error) { func enhanceRequestOptions(c Contact, opts *protocol.RequestOptions) error { var chatOptions protocol.ChatOptions - + chatOptions.ChatName = c.Topic switch c.Type { case ContactPublicRoom: - chatOptions.ChatName = c.Name case ContactPublicKey: chatOptions.Recipient = c.PublicKey default: diff --git a/protocol/client/stream.go b/protocol/client/stream.go index 3b74e3d..1cd90b0 100644 --- a/protocol/client/stream.go +++ b/protocol/client/stream.go @@ -43,14 +43,9 @@ func (pub PublicStream) Handle(msg protocol.Message) error { return nil } -func NewPrivateHandler(contacts []Contact, db Database) StreamHandler { - keyed := map[string]Contact{} - for i := range contacts { - keyed[PubkeyToHex(contacts[i].PublicKey)] = contacts[i] - } +func NewPrivateHandler(db Database) StreamHandler { return PrivateStream{ - contacts: keyed, - db: db, + db: db, }.Handle } @@ -58,8 +53,7 @@ func NewPrivateHandler(contacts []Contact, db Database) StreamHandler { // In our case every message will have a pubkey (derived from signature) that will be used // to determine who is the writer type PrivateStream struct { - contacts map[string]Contact // key is a hex from public key - db Database + db Database } func (priv PrivateStream) Handle(msg protocol.Message) error { @@ -67,13 +61,12 @@ func (priv PrivateStream) Handle(msg protocol.Message) error { return errors.New("message should be signed") } keyhex := PubkeyToHex(msg.SigPubKey) - // FIXME(dshulyak) Check if contact exist in database - // preferably don't marshal key as a blob contact := Contact{ Type: ContactPublicKey, State: ContactNew, Name: keyhex, // TODO(dshulyak) replace with 3-word funny name PublicKey: msg.SigPubKey, + Topic: DefaultPrivateTopic(), } exist, err := priv.db.PublicContactExist(contact) if err != nil { @@ -101,8 +94,7 @@ type AsyncStream interface { } type Stream struct { - // TODO replace contact with a topic. content from single stream can be delivered to multiple contacts. - contact Contact + options protocol.SubscribeOptions proto protocol.Protocol handler StreamHandler @@ -112,9 +104,9 @@ type Stream struct { wg sync.WaitGroup } -func NewStream(ctx context.Context, contact Contact, proto protocol.Protocol, handler StreamHandler) *Stream { +func NewStream(ctx context.Context, options protocol.SubscribeOptions, proto protocol.Protocol, handler StreamHandler) *Stream { return &Stream{ - contact: contact, + options: options, proto: proto, handler: handler, parent: ctx, @@ -128,11 +120,7 @@ func (stream *Stream) Start() error { ctx, cancel := context.WithCancel(stream.parent) stream.cancel = cancel msgs := make(chan *protocol.Message, 100) - opts, err := createSubscribeOptions(stream.contact) - if err != nil { - return errors.Wrap(err, "failed to create subscribe options") - } - sub, err := stream.proto.Subscribe(ctx, msgs, opts) + sub, err := stream.proto.Subscribe(ctx, msgs, stream.options) if err != nil { stream.cancel = nil return err diff --git a/protocol/client/stream_test.go b/protocol/client/stream_test.go index 621ae5e..1f2758c 100644 --- a/protocol/client/stream_test.go +++ b/protocol/client/stream_test.go @@ -31,7 +31,7 @@ func TestPrivateStreamSavesNewContactsAndMessages(t *testing.T) { pkey, err := crypto.GenerateKey() require.NoError(t, err) - private := NewPrivateHandler([]Contact{}, db) + private := NewPrivateHandler(db) msg := protocol.Message{ ID: []byte{1}, SigPubKey: &pkey.PublicKey, diff --git a/protocol/v1/protocol.go b/protocol/v1/protocol.go index 4a1f33e..5def16d 100644 --- a/protocol/v1/protocol.go +++ b/protocol/v1/protocol.go @@ -31,9 +31,6 @@ func (o ChatOptions) Validate() error { if o == (ChatOptions{}) { return errors.New("empty options") } - if o.ChatName != "" && o.Recipient != nil { - return errors.New("field ChatName and Recipient both set") - } return nil }