diff --git a/VERSION b/VERSION index 9b74e0051..8366fd98d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.32.0-beta.0 +0.33.0-beta.0 diff --git a/go.mod b/go.mod index 0091c9074..8eec712e1 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/status-im/doubleratchet v2.0.0+incompatible github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed github.com/status-im/rendezvous v1.3.0 - github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb + github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655 github.com/status-im/whisper v1.4.14 github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467 github.com/syndtr/goleveldb v1.0.0 diff --git a/go.sum b/go.sum index f6d55b00e..5a77cea1d 100644 --- a/go.sum +++ b/go.sum @@ -450,8 +450,8 @@ github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf0 github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA= github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4= github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s= -github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb h1:zJ7gHLGOw3CeTOri61Y6dO+j/vxlg8TKxFgydMWErnU= -github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb/go.mod h1:ASO3pqc3cEFT3LPKfClp4f39jbP1m2XvSmIfBk9Ssxc= +github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655 h1:7lJRfkt9fzt+wpL+1rgfhvFThs2yP8VJm3BHGbgfWsg= +github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655/go.mod h1:zEvtd2lNRzsqo4RCLU6WlK0EKFTGEoAAuYfj+lZjHNQ= github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc= github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -469,8 +469,8 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= -github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f h1:iBJcVbX4RaPtufXJ/PJtCF1jYPzlmyEF+imcQq5pTDs= -github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f/go.mod h1:Xc3UWtA230lUzZoXStJHQd/BkqJK5BAZyXBsiR6XrXM= +github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs= +github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/services/shhext/api.go b/services/shhext/api.go index 1a756a445..a4ee55a3f 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -571,8 +571,8 @@ func (api *PublicAPI) SaveChat(parent context.Context, chat statusproto.Chat) er return api.service.messenger.SaveChat(chat) } -func (api *PublicAPI) Chats(parent context.Context, to, from int) ([]*statusproto.Chat, error) { - return api.service.messenger.Chats(to, from) +func (api *PublicAPI) Chats(parent context.Context) ([]*statusproto.Chat, error) { + return api.service.messenger.Chats() } func (api *PublicAPI) DeleteChat(parent context.Context, chatID string) error { diff --git a/vendor/github.com/status-im/status-protocol-go/chat.go b/vendor/github.com/status-im/status-protocol-go/chat.go index f378dd63d..aec4b5eb5 100644 --- a/vendor/github.com/status-im/status-protocol-go/chat.go +++ b/vendor/github.com/status-im/status-protocol-go/chat.go @@ -1,6 +1,11 @@ package statusproto -import "crypto/ecdsa" +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" +) type ChatPagination struct { From uint @@ -79,3 +84,11 @@ type ChatMember struct { // Joined indicates if the member has joined the group chat Joined bool `json:"joined"` } + +func (c ChatMember) PublicKey() (*ecdsa.PublicKey, error) { + b, err := hexutil.Decode(c.ID) + if err != nil { + return nil, err + } + return crypto.UnmarshalPubkey(b) +} diff --git a/vendor/github.com/status-im/status-protocol-go/contact.go b/vendor/github.com/status-im/status-protocol-go/contact.go index 6d76ab632..cd00cb169 100644 --- a/vendor/github.com/status-im/status-protocol-go/contact.go +++ b/vendor/github.com/status-im/status-protocol-go/contact.go @@ -1,5 +1,18 @@ package statusproto +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" +) + +const ( + contactBlocked = "contact/blocked" + contactAdded = "contact/added" + contactRequestReceived = "contact/request-received" +) + // ContactDeviceInfo is a struct containing information about a particular device owned by a contact type ContactDeviceInfo struct { // The installation id of the device @@ -13,7 +26,7 @@ type ContactDeviceInfo struct { // Contact has information about a "Contact". A contact is not necessarily one // that we added or added us, that's based on SystemTags. type Contact struct { - // ID of the contact + // ID of the contact. It's a hex-encoded public key (prefixed with 0x). ID string `json:"id"` // Ethereum address of the contact Address string `json:"address"` @@ -31,3 +44,33 @@ type Contact struct { DeviceInfo []ContactDeviceInfo `json:"deviceInfo"` TributeToTalk string `json:"tributeToTalk"` } + +func (c Contact) PublicKey() (*ecdsa.PublicKey, error) { + b, err := hexutil.Decode(c.ID) + if err != nil { + return nil, err + } + return crypto.UnmarshalPubkey(b) +} + +func (c Contact) IsAdded() bool { + return existsInStringSlice(c.SystemTags, contactAdded) +} + +func (c Contact) HasBeenAdded() bool { + return existsInStringSlice(c.SystemTags, contactRequestReceived) +} + +func (c Contact) IsBlocked() bool { + return existsInStringSlice(c.SystemTags, contactBlocked) +} + +// existsInStringSlice checks if a string is in a set. +func existsInStringSlice(set []string, find string) bool { + for _, s := range set { + if s == find { + return true + } + } + return false +} diff --git a/vendor/github.com/status-im/status-protocol-go/go.mod b/vendor/github.com/status-im/status-protocol-go/go.mod index a6c8690e9..ac728f275 100644 --- a/vendor/github.com/status-im/status-protocol-go/go.mod +++ b/vendor/github.com/status-im/status-protocol-go/go.mod @@ -20,9 +20,7 @@ require ( github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed github.com/status-im/whisper v1.4.14 github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467 - github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f - go.uber.org/atomic v1.4.0 // indirect - go.uber.org/multierr v1.1.0 // indirect + github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c go.uber.org/zap v1.10.0 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect diff --git a/vendor/github.com/status-im/status-protocol-go/go.sum b/vendor/github.com/status-im/status-protocol-go/go.sum index 96c404c3a..1af32441e 100644 --- a/vendor/github.com/status-im/status-protocol-go/go.sum +++ b/vendor/github.com/status-im/status-protocol-go/go.sum @@ -252,8 +252,8 @@ github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2K github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= -github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f h1:iBJcVbX4RaPtufXJ/PJtCF1jYPzlmyEF+imcQq5pTDs= -github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f/go.mod h1:Xc3UWtA230lUzZoXStJHQd/BkqJK5BAZyXBsiR6XrXM= +github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs= +github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= diff --git a/vendor/github.com/status-im/status-protocol-go/messenger.go b/vendor/github.com/status-im/status-protocol-go/messenger.go index 85edba50b..ec2910551 100644 --- a/vendor/github.com/status-im/status-protocol-go/messenger.go +++ b/vendor/github.com/status-im/status-protocol-go/messenger.go @@ -249,6 +249,7 @@ func NewMessenger( nil, c.envelopesMonitorConfig, logger, + transport.SetGenericDiscoveryTopicSupport(c.featureFlags.genericDiscoveryTopicEnabled), ) if err != nil { return nil, errors.Wrap(err, "failed to create a WhisperServiceTransport") @@ -272,6 +273,7 @@ func NewMessenger( datasyncpeer.PublicKeyToPeerID(identity.PublicKey), datasyncnode.BATCH, datasync.CalculateSendTime, + logger, ) if err != nil { return nil, errors.Wrap(err, "failed to create a persistent datasync node") @@ -315,6 +317,66 @@ func NewMessenger( return messenger, nil } +// Init analyzes chats and contacts in order to setup filters +// which are responsible for retrieving messages. +func (m *Messenger) Init() error { + logger := m.logger.With(zap.String("site", "Init")) + + var ( + publicChatIDs []string + publicKeys []*ecdsa.PublicKey + ) + + // Get chat IDs and public keys from the existing chats. + // TODO: Get only active chats by the query. + chats, err := m.Chats() + if err != nil { + return err + } + for _, chat := range chats { + if !chat.Active { + continue + } + switch chat.ChatType { + case ChatTypePublic: + publicChatIDs = append(publicChatIDs, chat.ID) + case ChatTypeOneToOne: + publicKeys = append(publicKeys, chat.PublicKey) + case ChatTypePrivateGroupChat: + for _, member := range chat.Members { + publicKey, err := member.PublicKey() + if err != nil { + return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name) + } + publicKeys = append(publicKeys, publicKey) + } + default: + return errors.New("invalid chat type") + } + } + + // Get chat IDs and public keys from the contacts. + contacts, err := m.Contacts() + if err != nil { + return err + } + for _, contact := range contacts { + // We only need filters for contacts added by us and not blocked. + if !contact.IsAdded() || contact.IsBlocked() { + continue + } + publicKey, err := contact.PublicKey() + if err != nil { + logger.Error("failed to get contact's public key", zap.Error(err)) + continue + } + publicKeys = append(publicKeys, publicKey) + } + + _, err = m.adapter.transport.InitFilters(publicChatIDs, publicKeys) + return err +} + // Shutdown takes care of ensuring a clean shutdown of Messenger func (m *Messenger) Shutdown() (err error) { for _, task := range m.shutdownTasks { @@ -394,8 +456,8 @@ func (m *Messenger) SaveChat(chat Chat) error { return m.persistence.SaveChat(chat) } -func (m *Messenger) Chats(from, to int) ([]*Chat, error) { - return m.persistence.Chats(from, to) +func (m *Messenger) Chats() ([]*Chat, error) { + return m.persistence.Chats() } func (m *Messenger) DeleteChat(chatID string) error { @@ -592,13 +654,13 @@ func (m *Messenger) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMes } // DEPRECATED -func (m *Messenger) LoadFilters(chats []*transport.Filter) ([]*transport.Filter, error) { - return m.adapter.transport.LoadFilters(chats, m.featureFlags.genericDiscoveryTopicEnabled) +func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { + return m.adapter.transport.LoadFilters(filters) } // DEPRECATED -func (m *Messenger) RemoveFilters(chats []*transport.Filter) error { - return m.adapter.transport.RemoveFilters(chats) +func (m *Messenger) RemoveFilters(filters []*transport.Filter) error { + return m.adapter.transport.RemoveFilters(filters) } // DEPRECATED diff --git a/vendor/github.com/status-im/status-protocol-go/persistence.go b/vendor/github.com/status-im/status-protocol-go/persistence.go index 66cc2f530..8d9da524c 100644 --- a/vendor/github.com/status-im/status-protocol-go/persistence.go +++ b/vendor/github.com/status-im/status-protocol-go/persistence.go @@ -113,11 +113,11 @@ func (db sqlitePersistence) DeleteChat(chatID string) error { return err } -func (db sqlitePersistence) Chats(from, to int) ([]*Chat, error) { - return db.chats(from, to, nil) +func (db sqlitePersistence) Chats() ([]*Chat, error) { + return db.chats(nil) } -func (db sqlitePersistence) chats(from, to int, tx *sql.Tx) ([]*Chat, error) { +func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) { var err error if tx == nil { @@ -137,22 +137,22 @@ func (db sqlitePersistence) chats(from, to int, tx *sql.Tx) ([]*Chat, error) { } rows, err := tx.Query(`SELECT - id, - name, - color, - active, - type, - timestamp, - deleted_at_clock_value, - public_key, - unviewed_message_count, - last_clock_value, - last_message_content_type, - last_message_content, - members, - membership_updates + id, + name, + color, + active, + type, + timestamp, + deleted_at_clock_value, + public_key, + unviewed_message_count, + last_clock_value, + last_message_content_type, + last_message_content, + members, + membership_updates FROM chats - ORDER BY chats.timestamp DESC LIMIT ? OFFSET ?`, to, from) + ORDER BY chats.timestamp DESC`) if err != nil { return nil, err } diff --git a/vendor/github.com/status-im/status-protocol-go/persistence_legacy.go b/vendor/github.com/status-im/status-protocol-go/persistence_legacy.go index 6e75a5e29..f145f4d8c 100644 --- a/vendor/github.com/status-im/status-protocol-go/persistence_legacy.go +++ b/vendor/github.com/status-im/status-protocol-go/persistence_legacy.go @@ -377,5 +377,5 @@ func (db sqlitePersistence) BlockContact(contact Contact) ([]*Chat, error) { } // return the updated chats - return db.chats(0, -1, tx) + return db.chats(tx) } diff --git a/vendor/github.com/status-im/status-protocol-go/transport/whisper/filter.go b/vendor/github.com/status-im/status-protocol-go/transport/whisper/filter.go index 7f353cd72..b08dc3571 100644 --- a/vendor/github.com/status-im/status-protocol-go/transport/whisper/filter.go +++ b/vendor/github.com/status-im/status-protocol-go/transport/whisper/filter.go @@ -145,29 +145,29 @@ func (s *filtersManager) Init( s.mutex.Lock() defer s.mutex.Unlock() - var allChats []*Filter - for _, chat := range s.filters { - allChats = append(allChats, chat) + var allFilters []*Filter + for _, f := range s.filters { + allFilters = append(allFilters, f) } - return allChats, nil + return allFilters, nil } // DEPRECATED -func (s *filtersManager) InitWithChats(chats []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) { +func (s *filtersManager) InitWithFilters(filters []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) { var ( chatIDs []string publicKeys []*ecdsa.PublicKey ) - for _, chat := range chats { - if chat.Identity != "" && chat.OneToOne { - publicKey, err := strToPublicKey(chat.Identity) + for _, filter := range filters { + if filter.Identity != "" && filter.OneToOne { + publicKey, err := strToPublicKey(filter.Identity) if err != nil { return nil, err } publicKeys = append(publicKeys, publicKey) - } else if chat.ChatID != "" { - chatIDs = append(chatIDs, chat.ChatID) + } else if filter.ChatID != "" { + chatIDs = append(chatIDs, filter.ChatID) } } @@ -175,55 +175,55 @@ func (s *filtersManager) InitWithChats(chats []*Filter, genericDiscoveryTopicEna } func (s *filtersManager) Reset() error { - var chats []*Filter + var filters []*Filter s.mutex.Lock() - for _, chat := range s.filters { - chats = append(chats, chat) + for _, f := range s.filters { + filters = append(filters, f) } s.mutex.Unlock() - return s.Remove(chats...) + return s.Remove(filters...) } -func (s *filtersManager) Chats() (result []*Filter) { +func (s *filtersManager) Filters() (result []*Filter) { s.mutex.Lock() defer s.mutex.Unlock() - for _, chat := range s.filters { - result = append(result, chat) + for _, f := range s.filters { + result = append(result, f) } return } -// ChatByID returns a chat by id. -func (s *filtersManager) ChatByID(chatID string) *Filter { +func (s *filtersManager) Filter(chatID string) *Filter { s.mutex.Lock() defer s.mutex.Unlock() return s.filters[chatID] } -func (s *filtersManager) ChatByFilterID(filterID string) *Filter { +// FilterByFilterID returns a Filter with a given Whisper filter ID. +func (s *filtersManager) FilterByFilterID(filterID string) *Filter { s.mutex.Lock() defer s.mutex.Unlock() - for _, chat := range s.filters { - if chat.FilterID == filterID { - return chat + for _, f := range s.filters { + if f.FilterID == filterID { + return f } } return nil } -func (s *filtersManager) ChatsByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) { +func (s *filtersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) { s.mutex.Lock() defer s.mutex.Unlock() identityStr := publicKeyToStr(publicKey) - for _, chat := range s.filters { - if chat.Identity == identityStr { - result = append(result, chat) + for _, f := range s.filters { + if f.Identity == identityStr { + result = append(result, f) } } @@ -231,18 +231,18 @@ func (s *filtersManager) ChatsByPublicKey(publicKey *ecdsa.PublicKey) (result [] } // Remove remove all the filters associated with a chat/identity -func (s *filtersManager) Remove(chats ...*Filter) error { +func (s *filtersManager) Remove(filters ...*Filter) error { s.mutex.Lock() defer s.mutex.Unlock() - for _, chat := range chats { - if err := s.whisper.Unsubscribe(chat.FilterID); err != nil { + for _, f := range filters { + if err := s.whisper.Unsubscribe(f.FilterID); err != nil { return err } - if chat.SymKeyID != "" { - s.whisper.DeleteSymKey(chat.SymKeyID) + if f.SymKeyID != "" { + s.whisper.DeleteSymKey(f.SymKeyID) } - delete(s.filters, chat.ChatID) + delete(s.filters, f.ChatID) } return nil diff --git a/vendor/github.com/status-im/status-protocol-go/transport/whisper/whisper_service.go b/vendor/github.com/status-im/status-protocol-go/transport/whisper/whisper_service.go index db0ccbfee..e7355d58f 100644 --- a/vendor/github.com/status-im/status-protocol-go/transport/whisper/whisper_service.go +++ b/vendor/github.com/status-im/status-protocol-go/transport/whisper/whisper_service.go @@ -56,16 +56,27 @@ func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) { return m.shh.GetSymKey(id) } +type Option func(*WhisperServiceTransport) error + +func SetGenericDiscoveryTopicSupport(val bool) Option { + return func(t *WhisperServiceTransport) error { + t.genericDiscoveryTopicEnabled = val + return nil + } +} + // WhisperServiceTransport is a transport based on Whisper service. type WhisperServiceTransport struct { shh *whisper.Whisper shhAPI *whisper.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages keysManager *whisperServiceKeysManager - chats *filtersManager + filters *filtersManager logger *zap.Logger mailservers []string envelopesMonitor *EnvelopesMonitor + + genericDiscoveryTopicEnabled bool } // NewWhisperService returns a new WhisperServiceTransport. @@ -76,8 +87,9 @@ func NewWhisperServiceTransport( mailservers []string, envelopesMonitorConfig *EnvelopesMonitorConfig, logger *zap.Logger, + opts ...Option, ) (*WhisperServiceTransport, error) { - chats, err := newFiltersManager(db, shh, privateKey, logger) + filtersManager, err := newFiltersManager(db, shh, privateKey, logger) if err != nil { return nil, err } @@ -87,38 +99,54 @@ func NewWhisperServiceTransport( envelopesMonitor = NewEnvelopesMonitor(shh, envelopesMonitorConfig) envelopesMonitor.Start() } - return &WhisperServiceTransport{ + + t := &WhisperServiceTransport{ shh: shh, shhAPI: whisper.NewPublicWhisperAPI(shh), envelopesMonitor: envelopesMonitor, - keysManager: &whisperServiceKeysManager{ shh: shh, privateKey: privateKey, passToSymKeyCache: make(map[string]string), }, - chats: chats, + filters: filtersManager, mailservers: mailservers, logger: logger.With(zap.Namespace("WhisperServiceTransport")), - }, nil + } + + for _, opt := range opts { + if err := opt(t); err != nil { + return nil, err + } + } + + return t, nil +} + +func (a *WhisperServiceTransport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) { + return a.filters.Init(chatIDs, publicKeys, a.genericDiscoveryTopicEnabled) +} + +func (a *WhisperServiceTransport) Filters() []*Filter { + return a.filters.Filters() } // DEPRECATED -func (a *WhisperServiceTransport) LoadFilters(chats []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) { - return a.chats.InitWithChats(chats, genericDiscoveryTopicEnabled) +func (a *WhisperServiceTransport) LoadFilters(filters []*Filter) ([]*Filter, error) { + return a.filters.InitWithFilters(filters, a.genericDiscoveryTopicEnabled) } // DEPRECATED -func (a *WhisperServiceTransport) RemoveFilters(chats []*Filter) error { - return a.chats.Remove(chats...) +func (a *WhisperServiceTransport) RemoveFilters(filters []*Filter) error { + return a.filters.Remove(filters...) } func (a *WhisperServiceTransport) Reset() error { - return a.chats.Reset() + return a.filters.Reset() } func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret NegotiatedSecret) (*Filter, error) { - filter, err := a.chats.LoadNegotiated(secret) + filter, err := a.filters.LoadNegotiated(secret) if err != nil { return nil, err } @@ -126,26 +154,26 @@ func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret NegotiatedSecre } func (a *WhisperServiceTransport) JoinPublic(chatID string) error { - _, err := a.chats.LoadPublic(chatID) + _, err := a.filters.LoadPublic(chatID) return err } func (a *WhisperServiceTransport) LeavePublic(chatID string) error { - chat := a.chats.ChatByID(chatID) + chat := a.filters.Filter(chatID) if chat != nil { return nil } - return a.chats.Remove(chat) + return a.filters.Remove(chat) } func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error { - _, err := a.chats.LoadContactCode(publicKey) + _, err := a.filters.LoadContactCode(publicKey) return err } func (a *WhisperServiceTransport) LeavePrivate(publicKey *ecdsa.PublicKey) error { - chats := a.chats.ChatsByPublicKey(publicKey) - return a.chats.Remove(chats...) + filters := a.filters.FiltersByPublicKey(publicKey) + return a.filters.Remove(filters...) } type ChatMessages struct { @@ -157,15 +185,15 @@ type ChatMessages struct { func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error) { chatMessages := make(map[string]ChatMessages) - for _, chat := range a.chats.Chats() { - f := a.shh.GetFilter(chat.FilterID) + for _, filter := range a.filters.Filters() { + f := a.shh.GetFilter(filter.FilterID) if f == nil { return nil, errors.New("failed to return a filter") } - messages := chatMessages[chat.ChatID] - messages.ChatID = chat.ChatID - messages.Public = chat.IsPublic() + messages := chatMessages[filter.ChatID] + messages.ChatID = filter.ChatID + messages.Public = filter.IsPublic() messages.Messages = append(messages.Messages, f.Retrieve()...) } @@ -177,12 +205,12 @@ func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error) } func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whisper.ReceivedMessage, error) { - chat, err := a.chats.LoadPublic(chatID) + filter, err := a.filters.LoadPublic(chatID) if err != nil { return nil, err } - f := a.shh.GetFilter(chat.FilterID) + f := a.shh.GetFilter(filter.FilterID) if f == nil { return nil, errors.New("failed to return a filter") } @@ -191,8 +219,8 @@ func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whis } func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*whisper.ReceivedMessage, error) { - chats := a.chats.ChatsByPublicKey(publicKey) - discoveryChats, err := a.chats.Init(nil, nil, true) + chats := a.filters.FiltersByPublicKey(publicKey) + discoveryChats, err := a.filters.Init(nil, nil, true) if err != nil { return nil, err } @@ -215,14 +243,14 @@ func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.Publi func (a *WhisperServiceTransport) RetrieveRawAll() (map[Filter][]*whisper.ReceivedMessage, error) { result := make(map[Filter][]*whisper.ReceivedMessage) - allChats := a.chats.Chats() - for _, chat := range allChats { - f := a.shh.GetFilter(chat.FilterID) + allFilters := a.filters.Filters() + for _, filter := range allFilters { + f := a.shh.GetFilter(filter.FilterID) if f == nil { return nil, errors.New("failed to return a filter") } - result[*chat] = append(result[*chat], f.Retrieve()...) + result[*filter] = append(result[*filter], f.Retrieve()...) } return result, nil @@ -245,13 +273,13 @@ func (a *WhisperServiceTransport) SendPublic(ctx context.Context, newMessage *wh return nil, err } - chat, err := a.chats.LoadPublic(chatName) + filter, err := a.filters.LoadPublic(chatName) if err != nil { return nil, err } - newMessage.SymKeyID = chat.SymKeyID - newMessage.Topic = chat.Topic + newMessage.SymKeyID = filter.SymKeyID + newMessage.Topic = filter.Topic return a.shhAPI.Post(ctx, *newMessage) } @@ -261,7 +289,7 @@ func (a *WhisperServiceTransport) SendPrivateWithSharedSecret(ctx context.Contex return nil, err } - chat, err := a.chats.LoadNegotiated(NegotiatedSecret{ + filter, err := a.filters.LoadNegotiated(NegotiatedSecret{ PublicKey: publicKey, Key: secret, }) @@ -269,8 +297,8 @@ func (a *WhisperServiceTransport) SendPrivateWithSharedSecret(ctx context.Contex return nil, err } - newMessage.SymKeyID = chat.SymKeyID - newMessage.Topic = chat.Topic + newMessage.SymKeyID = filter.SymKeyID + newMessage.Topic = filter.Topic newMessage.PublicKey = nil return a.shhAPI.Post(ctx, *newMessage) @@ -281,12 +309,12 @@ func (a *WhisperServiceTransport) SendPrivateWithPartitioned(ctx context.Context return nil, err } - chat, err := a.chats.LoadPartitioned(publicKey) + filter, err := a.filters.LoadPartitioned(publicKey) if err != nil { return nil, err } - newMessage.Topic = chat.Topic + newMessage.Topic = filter.Topic newMessage.PublicKey = crypto.FromECDSAPub(publicKey) return a.shhAPI.Post(ctx, *newMessage) diff --git a/vendor/github.com/vacp2p/mvds/node/node.go b/vendor/github.com/vacp2p/mvds/node/node.go index 0a547e595..400e694c0 100644 --- a/vendor/github.com/vacp2p/mvds/node/node.go +++ b/vendor/github.com/vacp2p/mvds/node/node.go @@ -6,11 +6,13 @@ package node import ( "context" "database/sql" + "encoding/hex" "fmt" - "log" "sync/atomic" "time" + "go.uber.org/zap" + "github.com/vacp2p/mvds/peers" "github.com/vacp2p/mvds/protobuf" "github.com/vacp2p/mvds/state" @@ -31,6 +33,8 @@ type CalculateNextEpoch func(count uint64, epoch int64) int64 // Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages. type Node struct { + // This needs to be declared first: https://github.com/golang/go/issues/9959 + epoch int64 ctx context.Context cancel context.CancelFunc @@ -48,10 +52,11 @@ type Node struct { ID state.PeerID epochPersistence *epochSQLitePersistence - epoch int64 mode Mode subscription chan protobuf.Message + + logger *zap.Logger } func NewPersistentNode( @@ -60,8 +65,13 @@ func NewPersistentNode( id state.PeerID, mode Mode, nextEpoch CalculateNextEpoch, + logger *zap.Logger, ) (*Node, error) { ctx, cancel := context.WithCancel(context.Background()) + if logger == nil { + logger = zap.NewNop() + } + node := Node{ ID: id, ctx: ctx, @@ -73,6 +83,7 @@ func NewPersistentNode( payloads: newPayloads(), epochPersistence: newEpochSQLitePersistence(db), nextEpoch: nextEpoch, + logger: logger.With(zap.Namespace("mvds")), mode: mode, } if currentEpoch, err := node.epochPersistence.Get(id); err != nil { @@ -89,8 +100,13 @@ func NewEphemeralNode( nextEpoch CalculateNextEpoch, currentEpoch int64, mode Mode, + logger *zap.Logger, ) *Node { ctx, cancel := context.WithCancel(context.Background()) + if logger == nil { + logger = zap.NewNop() + } + return &Node{ ID: id, ctx: ctx, @@ -102,6 +118,7 @@ func NewEphemeralNode( payloads: newPayloads(), nextEpoch: nextEpoch, epoch: currentEpoch, + logger: logger.With(zap.Namespace("mvds")), mode: mode, } } @@ -116,8 +133,13 @@ func NewNode( id state.PeerID, mode Mode, pp peers.Persistence, + logger *zap.Logger, ) *Node { ctx, cancel := context.WithCancel(context.Background()) + if logger == nil { + logger = zap.NewNop() + } + return &Node{ ctx: ctx, cancel: cancel, @@ -129,6 +151,7 @@ func NewNode( nextEpoch: nextEpoch, ID: id, epoch: currentEpoch, + logger: logger.With(zap.Namespace("mvds")), mode: mode, } } @@ -143,7 +166,7 @@ func (n *Node) Start(duration time.Duration) { for { select { case <-n.ctx.Done(): - log.Print("Watch stopped") + n.logger.Info("Watch stopped") return default: p := n.transport.Watch() @@ -156,20 +179,20 @@ func (n *Node) Start(duration time.Duration) { for { select { case <-n.ctx.Done(): - log.Print("Epoch processing stopped") + n.logger.Info("Epoch processing stopped") return default: - log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch) + n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch)) time.Sleep(duration) err := n.sendMessages() if err != nil { - log.Printf("Error sending messages: %+v\n", err) + n.logger.Error("Error sending messages.", zap.Error(err)) } atomic.AddInt64(&n.epoch, 1) // When a persistent node is used, the epoch needs to be saved. if n.epochPersistence != nil { if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil { - log.Printf("Failed to persisten epoch: %v", err) + n.logger.Error("Failed to persisten epoch", zap.Error(err)) } } } @@ -179,7 +202,7 @@ func (n *Node) Start(duration time.Duration) { // Stop message reading and epoch processing func (n *Node) Stop() { - log.Print("Stopping node") + n.logger.Info("Stopping node") n.Unsubscribe() n.cancel() } @@ -227,7 +250,10 @@ func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageI n.insertSyncState(&groupID, id, p, t) } - log.Printf("[%x] node %x sending %x\n", groupID[:4], n.ID[:4], id[:4]) + n.logger.Debug("Sending message", + zap.String("node", hex.EncodeToString(n.ID[:4])), + zap.String("groupID", hex.EncodeToString(groupID[:4])), + zap.String("id", hex.EncodeToString(id[:4]))) // @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here return id, nil @@ -275,7 +301,12 @@ func (n *Node) sendMessages() error { n.payloads.AddOffers(p, m[:]) case state.REQUEST: n.payloads.AddRequests(p, m[:]) - log.Printf("sending REQUEST (%x -> %x): %x\n", n.ID[:4], p[:4], m[:4]) + n.logger.Debug("sending REQUEST", + zap.String("from", hex.EncodeToString(n.ID[:4])), + zap.String("to", hex.EncodeToString(p[:4])), + zap.String("messageID", hex.EncodeToString(m[:4])), + ) + case state.MESSAGE: g := *s.GroupID // TODO: Handle errors @@ -290,26 +321,36 @@ func (n *Node) sendMessages() error { msg, err := n.store.Get(m) if err != nil { - log.Printf("failed to retreive message %x %s", m[:4], err.Error()) + n.logger.Error("Failed to retreive message", + zap.String("messageID", hex.EncodeToString(m[:4])), + zap.Error(err), + ) + return s } n.payloads.AddMessages(p, msg) - log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", g[:4], n.ID[:4], p[:4], m[:4]) + n.logger.Debug("sending MESSAGE", + zap.String("groupID", hex.EncodeToString(g[:4])), + zap.String("from", hex.EncodeToString(n.ID[:4])), + zap.String("to", hex.EncodeToString(p[:4])), + zap.String("messageID", hex.EncodeToString(m[:4])), + ) + } return n.updateSendEpoch(s) }) if err != nil { - log.Printf("error while mapping sync state: %s", err.Error()) + n.logger.Error("error while mapping sync state", zap.Error(err)) return err } return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error { err := n.transport.Send(n.ID, peer, payload) if err != nil { - log.Printf("error sending message: %s", err.Error()) + n.logger.Error("error sending message", zap.Error(err)) return err } return nil @@ -320,13 +361,13 @@ func (n *Node) sendMessages() error { func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) { // Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer. if err := n.onAck(sender, payload.Acks); err != nil { - log.Printf("error processing acks: %s", err.Error()) + n.logger.Error("error processing acks", zap.Error(err)) } if err := n.onRequest(sender, payload.Requests); err != nil { - log.Printf("error processing requests: %s", err.Error()) + n.logger.Error("error processing requests", zap.Error(err)) } if err := n.onOffer(sender, payload.Offers); err != nil { - log.Printf("error processing offers: %s", err.Error()) + n.logger.Error("error processing offers", zap.Error(err)) } messageIds := n.onMessages(sender, payload.Messages) n.payloads.AddAcks(sender, messageIds) @@ -335,7 +376,11 @@ func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) { func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error { for _, raw := range offers { id := toMessageID(raw) - log.Printf("OFFER (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) + n.logger.Debug("OFFER received", + zap.String("from", hex.EncodeToString(sender[:4])), + zap.String("to", hex.EncodeToString(n.ID[:4])), + zap.String("messageID", hex.EncodeToString(id[:4])), + ) exist, err := n.store.Has(id) // @todo maybe ack? @@ -355,7 +400,11 @@ func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error { func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error { for _, raw := range requests { id := toMessageID(raw) - log.Printf("REQUEST (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) + n.logger.Debug("REQUEST received", + zap.String("from", hex.EncodeToString(sender[:4])), + zap.String("to", hex.EncodeToString(n.ID[:4])), + zap.String("messageID", hex.EncodeToString(id[:4])), + ) message, err := n.store.Get(id) if err != nil { @@ -363,7 +412,7 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error { } if message == nil { - log.Printf("message %x does not exist", id[:4]) + n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4]))) continue } @@ -375,7 +424,10 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error { } if !exist { - log.Printf("[%x] peer %x is not in group", groupID, sender[:4]) + n.logger.Error("peer is not in group", + zap.String("groupID", hex.EncodeToString(groupID[:4])), + zap.String("peer", hex.EncodeToString(sender[:4])), + ) continue } @@ -391,11 +443,16 @@ func (n *Node) onAck(sender state.PeerID, acks [][]byte) error { err := n.syncState.Remove(id, sender) if err != nil { - log.Printf("error while removing sync state %s", err.Error()) + n.logger.Error("Error while removing sync state.", zap.Error(err)) return err } - log.Printf("ACK (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) + n.logger.Debug("ACK received", + zap.String("from", hex.EncodeToString(sender[:4])), + zap.String("to", hex.EncodeToString(n.ID[:4])), + zap.String("messageID", hex.EncodeToString(id[:4])), + ) + } return nil } @@ -407,12 +464,18 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][ groupID := toGroupID(m.GroupId) err := n.onMessage(sender, *m) if err != nil { - log.Printf("Error processing messsage: %+v\n", err) + n.logger.Error("Error processing message", zap.Error(err)) continue } id := m.ID() - log.Printf("[%x] sending ACK (%x -> %x): %x\n", groupID[:4], n.ID[:4], sender[:4], id[:4]) + n.logger.Debug("sending ACK", + zap.String("groupID", hex.EncodeToString(groupID[:4])), + zap.String("from", hex.EncodeToString(n.ID[:4])), + zap.String("", hex.EncodeToString(sender[:4])), + zap.String("messageID", hex.EncodeToString(id[:4])), + ) + a = append(a, id[:]) } @@ -422,7 +485,11 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error { id := msg.ID() groupID := toGroupID(msg.GroupId) - log.Printf("MESSAGE (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4]) + n.logger.Debug("MESSAGE received", + zap.String("from", hex.EncodeToString(sender[:4])), + zap.String("to", hex.EncodeToString(n.ID[:4])), + zap.String("messageID", hex.EncodeToString(id[:4])), + ) err := n.syncState.Remove(id, sender) if err != nil { @@ -466,7 +533,13 @@ func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID err := n.syncState.Add(s) if err != nil { - log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), groupID, messageID, peerID) + n.logger.Error("error setting sync states", + zap.Error(err), + zap.String("groupID", hex.EncodeToString(groupID[:4])), + zap.String("messageID", hex.EncodeToString(messageID[:4])), + zap.String("peerID", hex.EncodeToString(peerID[:4])), + ) + } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 0136d74e9..2bc18b5db 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -31,6 +31,7 @@ github.com/ethereum/go-ethereum/common github.com/ethereum/go-ethereum/common/hexutil github.com/ethereum/go-ethereum/crypto github.com/ethereum/go-ethereum/ethclient +github.com/ethereum/go-ethereum/event github.com/ethereum/go-ethereum/log github.com/ethereum/go-ethereum/node github.com/ethereum/go-ethereum/p2p/enode @@ -40,7 +41,6 @@ github.com/ethereum/go-ethereum/metrics github.com/ethereum/go-ethereum github.com/ethereum/go-ethereum/crypto/ecies github.com/ethereum/go-ethereum/p2p/enr -github.com/ethereum/go-ethereum/event github.com/ethereum/go-ethereum/rlp github.com/ethereum/go-ethereum/accounts/abi github.com/ethereum/go-ethereum/accounts/abi/bind @@ -317,7 +317,7 @@ github.com/status-im/migrate/v4/database/sqlcipher github.com/status-im/rendezvous github.com/status-im/rendezvous/protocol github.com/status-im/rendezvous/server -# github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb +# github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655 github.com/status-im/status-protocol-go/zaputil github.com/status-im/status-protocol-go github.com/status-im/status-protocol-go/encryption/multidevice @@ -353,7 +353,7 @@ github.com/syndtr/goleveldb/leveldb/filter github.com/syndtr/goleveldb/leveldb/journal github.com/syndtr/goleveldb/leveldb/memdb github.com/syndtr/goleveldb/leveldb/table -# github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f +# github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c github.com/vacp2p/mvds/node github.com/vacp2p/mvds/protobuf github.com/vacp2p/mvds/state