Request info about community from mailserver (#2177)

This commit is contained in:
Volodymyr Kozieiev 2021-04-19 15:09:46 +03:00 committed by GitHub
parent 07a713cc71
commit 8a370c1c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 293 additions and 23 deletions

View File

@ -1 +1 @@
0.76.2
0.76.3

2
go.mod
View File

@ -54,10 +54,12 @@ require (
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.0
github.com/prometheus/common v0.9.1
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-ethereum v1.9.1 // indirect
github.com/status-im/keycard-go v0.0.0-20200107115650-f38e9a19958e // indirect
github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf
github.com/status-im/migrate/v4 v4.6.2-status.2

3
go.sum
View File

@ -647,6 +647,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/status-im/doubleratchet v3.0.0+incompatible h1:aJ1ejcSERpSzmWZBgtfYtiU2nF0Q8ZkGyuEPYETXkCY=
github.com/status-im/doubleratchet v3.0.0+incompatible/go.mod h1:1sqR0+yhiM/bd+wrdX79AOt2csZuJOni0nUDzKNuqOU=
github.com/status-im/go-ethereum v1.9.1 h1:N3E3g++Isr4OORPVPwWFlaHp+3EOV9X4A88MbcRsk2Y=
github.com/status-im/go-ethereum v1.9.1/go.mod h1:rdSZTVW2L1V6dsRpW77o4gwqWRUYfb/e9nNHcIgVEO4=
github.com/status-im/go-ethereum v1.9.5-status.10 h1:QAPL3CMm84nHQG08wfra3HpLUehk+DxaQYMAfda0BzY=
github.com/status-im/go-ethereum v1.9.5-status.10/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g=
github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk=
@ -763,6 +765,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c h1:/nJuwDLoL/zrqY6gf57vxC+Pi+pZ8bfhpPkicO5H7W4=
golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=

View File

@ -133,8 +133,33 @@ func (o *Community) MarshalJSON() ([]byte, error) {
}
func (o *Community) Name() string {
if o != nil &&
o.config != nil &&
o.config.CommunityDescription != nil &&
o.config.CommunityDescription.Identity != nil {
return o.config.CommunityDescription.Identity.DisplayName
}
return ""
}
func (o *Community) Description() string {
if o != nil &&
o.config != nil &&
o.config.CommunityDescription != nil &&
o.config.CommunityDescription.Identity != nil {
return o.config.CommunityDescription.Identity.Description
}
return ""
}
func (o *Community) MembersCount() int {
if o != nil &&
o.config != nil &&
o.config.CommunityDescription != nil {
return len(o.config.CommunityDescription.Members)
}
return 0
}
func (o *Community) initialize() {
if o.config.CommunityDescription == nil {

View File

@ -105,6 +105,7 @@ type Messenger struct {
account *multiaccounts.Account
mailserversDatabase *mailservers.Database
quit chan struct{}
requestedCommunities map[string]*transport.Filter
// TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed
mutex sync.Mutex
@ -313,6 +314,7 @@ func NewMessenger(
mailserversDatabase: c.mailserversDatabase,
account: c.account,
quit: make(chan struct{}),
requestedCommunities: make(map[string]*transport.Filter),
shutdownTasks: []func() error{
ensVerifier.Stop,
pushNotificationClient.Stop,
@ -2403,13 +2405,13 @@ func (m *Messenger) markDeliveredMessages(acks [][]byte) {
}
//send signal to client that message status updated
if m.config.messageDeliveredHandler != nil {
if m.config.messengerSignalsHandler != nil {
message, err := m.persistence.MessageByID(messageID)
if err != nil {
m.logger.Debug("Can't get message from database", zap.Error(err))
continue
}
m.config.messageDeliveredHandler(message.LocalChatID, messageID)
m.config.messengerSignalsHandler.MessageDelivered(message.LocalChatID, messageID)
}
}
}
@ -2795,6 +2797,14 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
allMessagesProcessed = false
continue
}
//if community was among requested ones, send its info and remove filter
for communityID := range m.requestedCommunities {
if _, ok := messageState.Response.communities[communityID]; ok {
m.passStoredCommunityInfoToSignalHandler(communityID)
}
}
case protobuf.CommunityInvitation:
logger.Debug("Handling CommunityInvitation")
invitation := msg.ParsedMessage.Interface().(protobuf.CommunityInvitation)
@ -3008,11 +3018,26 @@ func (m *Messenger) RequestHistoricMessages(
ctx context.Context,
from, to uint32,
cursor []byte,
waitForResponse bool,
) ([]byte, error) {
if m.mailserver == nil {
return nil, errors.New("no mailserver selected")
}
return m.transport.SendMessagesRequest(ctx, m.mailserver, from, to, cursor)
return m.transport.SendMessagesRequest(ctx, m.mailserver, from, to, cursor, waitForResponse)
}
func (m *Messenger) RequestHistoricMessagesForFilter(
ctx context.Context,
from, to uint32,
cursor []byte,
filter *transport.Filter,
waitForResponse bool,
) ([]byte, error) {
if m.mailserver == nil {
return nil, errors.New("no mailserver selected")
}
return m.transport.SendMessagesRequestForFilter(ctx, m.mailserver, from, to, cursor, filter, waitForResponse)
}
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {

View File

@ -3,6 +3,7 @@ package protocol
import (
"context"
"crypto/ecdsa"
"fmt"
"time"
"github.com/golang/protobuf/proto"
@ -14,6 +15,7 @@ import (
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
)
const communityInvitationText = "Upgrade to see a community invitation"
@ -371,6 +373,10 @@ func (m *Messenger) ImportCommunity(key *ecdsa.PrivateKey) (*MessengerResponse,
return nil, err
}
//request info already stored on mailserver, but its success is not crucial
// for import
_ = m.RequestCommunityInfoFromMailserver(org.IDString())
return &MessengerResponse{
Filters: filters,
}, nil
@ -493,3 +499,97 @@ func (m *Messenger) BanUserFromCommunity(request *requests.BanUserFromCommunity)
response.AddCommunity(community)
return response, nil
}
// RequestCommunityInfoFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler
func (m *Messenger) RequestCommunityInfoFromMailserver(communityID string) error {
if _, ok := m.requestedCommunities[communityID]; ok {
return nil
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(communityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]string{communityID})
if err != nil {
return fmt.Errorf("Can't install filter for community: %v", err)
}
if len(filters) != 1 {
return fmt.Errorf("Unexpected amount of filters created")
}
filter = filters[0]
m.requestedCommunities[communityID] = filter
} else {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[communityID] = nil
}
now := uint32(m.transport.GetCurrentTime() / 1000)
monthAgo := now - (86400 * 30)
_, err := m.RequestHistoricMessagesForFilter(context.Background(),
monthAgo,
now,
nil,
filter,
false)
//It is possible that we already processed last existing message for community
//and won't get any updates, so send stored info in this case after timeout
go func() {
time.Sleep(15 * time.Second)
m.mutex.Lock()
defer m.mutex.Unlock()
if _, ok := m.requestedCommunities[communityID]; ok {
m.passStoredCommunityInfoToSignalHandler(communityID)
}
}()
return err
}
// forgetCommunityRequest removes community from requested ones and removes filter
func (m *Messenger) forgetCommunityRequest(communityID string) {
filter, ok := m.requestedCommunities[communityID]
if !ok {
return
}
if filter != nil {
err := m.transport.RemoveFilters([]*transport.Filter{filter})
if err != nil {
m.logger.Warn("cant remove filter", zap.Error(err))
}
}
delete(m.requestedCommunities, communityID)
}
// passStoredCommunityInfoToSignalHandler calls signal handler with community info
func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
if m.config.messengerSignalsHandler == nil {
return
}
//send signal to client that message status updated
community, err := m.communitiesManager.GetByIDString(communityID)
if community == nil {
return
}
//if there is no info helpful for client, we don't post it
if community.Name() == "" && community.Description() == "" && community.MembersCount() == 0 {
return
}
if err != nil {
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
return
}
m.config.messengerSignalsHandler.CommunityInfoFound(community)
m.forgetCommunityRequest(communityID)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/pushnotificationclient"
"github.com/status-im/status-go/protocol/pushnotificationserver"
@ -16,6 +17,11 @@ import (
type MessageDeliveredHandler func(string, string)
type MessengerSignalsHandler interface {
MessageDelivered(chatID string, messageID string)
CommunityInfoFound(community *communities.Community)
}
type config struct {
// This needs to be exposed until we move here mailserver logic
// as otherwise the client is not notified of a new filter and
@ -47,7 +53,7 @@ type config struct {
logger *zap.Logger
messageDeliveredHandler MessageDeliveredHandler
messengerSignalsHandler MessengerSignalsHandler
}
type Option func(*config) error
@ -152,9 +158,9 @@ func WithEnvelopesMonitorConfig(emc *transport.EnvelopesMonitorConfig) Option {
}
}
func WithDeliveredHandler(h MessageDeliveredHandler) Option {
func WithSignalsHandler(h MessengerSignalsHandler) Option {
return func(c *config) error {
c.messageDeliveredHandler = h
c.messengerSignalsHandler = h
return nil
}
}

View File

@ -2207,7 +2207,7 @@ func (s *MessengerSuite) TestRequestHistoricMessagesRequest() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
m.mailserver = []byte("mailserver-id")
cursor, err := m.RequestHistoricMessages(ctx, 10, 20, []byte{0x01})
cursor, err := m.RequestHistoricMessages(ctx, 10, 20, []byte{0x01}, true)
s.EqualError(err, ctx.Err().Error())
s.Empty(cursor)
// verify request is correct

View File

@ -225,6 +225,14 @@ func (s *FiltersManager) FilterByFilterID(filterID string) *Filter {
return nil
}
// FilterByChatID returns a Filter for given chat id
func (s *FiltersManager) FilterByChatID(chatID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.filters[chatID]
}
func (s *FiltersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) {
s.mutex.Lock()
defer s.mutex.Unlock()

View File

@ -30,8 +30,19 @@ type Transport interface {
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error)
SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *Filter,
waitForResponse bool,
) (cursor []byte, err error)
FilterByChatID(string) *Filter
Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage)
InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error)

View File

@ -141,6 +141,10 @@ func (a *Transport) Filters() []*transport.Filter {
return a.filters.Filters()
}
func (a *Transport) FilterByChatID(chatID string) *transport.Filter {
return a.filters.FilterByChatID(chatID)
}
func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return a.filters.InitWithFilters(filters)
}
@ -410,17 +414,14 @@ func (a *Transport) cleanFiltersLoop() {
}()
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (a *Transport) SendMessagesRequest(
func (a *Transport) sendMessagesRequestForTopics(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
topics = append(topics, f.Topic)
}
r := createMessagesRequest(from, to, previousCursor, topics)
r.SetDefaults(a.waku.GetCurrentTime())
@ -434,6 +435,10 @@ func (a *Transport) SendMessagesRequest(
return
}
if !waitForResponse {
return
}
resp, err := a.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
@ -443,15 +448,42 @@ func (a *Transport) SendMessagesRequest(
return
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (a *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
topics = append(topics, f.Topic)
}
return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
func (a *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *transport.Filter,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
topics = append(topics, filter.Topic)
return a.sendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
}
func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
a.logger.Debug(
"waiting for request completed and received an event",
zap.Binary("requestID", requestID),
zap.Any("event", ev),
)
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}

View File

@ -139,6 +139,10 @@ func (a *Transport) Filters() []*transport.Filter {
return a.filters.Filters()
}
func (a *Transport) FilterByChatID(chatID string) *transport.Filter {
return a.filters.FilterByChatID(chatID)
}
func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return a.filters.InitWithFilters(filters)
}
@ -418,6 +422,7 @@ func (a *Transport) SendMessagesRequest(
peerID []byte,
from, to uint32,
previousCursor []byte,
waitForResponse bool,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
@ -436,6 +441,10 @@ func (a *Transport) SendMessagesRequest(
return
}
if !waitForResponse {
return
}
resp, err := a.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
@ -445,6 +454,18 @@ func (a *Transport) SendMessagesRequest(
return
}
//TODO: kozieiev: fix
func (a *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
filter *transport.Filter,
waitForResponse bool,
) (cursor []byte, err error) {
return nil, nil
}
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
return a.filters.LoadPartitioned(&key.PublicKey, key, true)
}

View File

@ -700,6 +700,10 @@ func (api *PublicAPI) EnsVerified(pk, ensName string) error {
return api.service.messenger.ENSVerified(pk, ensName)
}
func (api *PublicAPI) RequestCommunityInfoFromMailserver(communityID string) error {
return api.service.messenger.RequestCommunityInfoFromMailserver(communityID)
}
func (api *PublicAPI) UnreadActivityCenterNotificationsCount() (uint64, error) {
return api.service.messenger.UnreadActivityCenterNotificationsCount()
}

View File

@ -153,7 +153,7 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, multiAcco
s.multiAccountsDB = multiAccountDb
s.account = acc
options, err := buildMessengerOptions(s.config, identity, db, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, logger, signal.SendMessageDelivered)
options, err := buildMessengerOptions(s.config, identity, db, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, logger, &MessengerSignalsHandler{})
if err != nil {
return err
}
@ -442,7 +442,7 @@ func buildMessengerOptions(
envelopesMonitorConfig *transport.EnvelopesMonitorConfig,
accountsDB *accounts.Database,
logger *zap.Logger,
messageDeliveredHandler protocol.MessageDeliveredHandler,
messengerSignalsHandler protocol.MessengerSignalsHandler,
) ([]protocol.Option, error) {
options := []protocol.Option{
protocol.WithCustomLogger(logger),
@ -453,7 +453,7 @@ func buildMessengerOptions(
protocol.WithAccount(account),
protocol.WithEnvelopesMonitorConfig(envelopesMonitorConfig),
protocol.WithOnNegotiatedFilters(onNegotiatedFilters),
protocol.WithDeliveredHandler(messageDeliveredHandler),
protocol.WithSignalsHandler(messengerSignalsHandler),
protocol.WithENSVerificationConfig(publishMessengerResponse, config.VerifyENSURL, config.VerifyENSContractAddress),
}

View File

@ -3,6 +3,7 @@ package ext
import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/signal"
)
@ -48,3 +49,16 @@ func (h PublisherSignalHandler) FilterAdded(filters []*signal.Filter) {
func (h PublisherSignalHandler) NewMessages(response *protocol.MessengerResponse) {
signal.SendNewMessages(response)
}
// MessengerSignalHandler sends signals on messenger events
type MessengerSignalsHandler struct{}
// MessageDelivered passes information that message was delivered
func (m MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) {
signal.SendMessageDelivered(chatID, messageID)
}
// MessageDelivered passes info about community that was requested before
func (m MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) {
signal.SendCommunityInfoFound(community)
}

View File

@ -1,9 +1,15 @@
package signal
import "github.com/status-im/status-go/protocol/communities"
const (
// EventMesssageDelivered triggered when we got acknowledge from datasync level, that means peer got message
EventMesssageDelivered = "message.delivered"
// EventCommunityFound triggered when user requested info about some community and messenger successfully
// retrieved it from mailserver
EventCommunityInfoFound = "community.found"
)
// MessageDeliveredSignal specifies chat and message that was delivered
@ -12,7 +18,20 @@ type MessageDeliveredSignal struct {
MessageID string `json:"messageID"`
}
// MessageDeliveredSignal specifies chat and message that was delivered
type CommunityInfoFoundSignal struct {
Name string `json:"name"`
Description string `json:"description"`
MembersCount int `json:"membersCount"`
Verified bool `json:"verified"`
}
// SendMessageDelivered notifies about delivered message
func SendMessageDelivered(chatID string, messageID string) {
send(EventMesssageDelivered, MessageDeliveredSignal{ChatID: chatID, MessageID: messageID})
}
// SendMessageDelivered notifies about delivered message
func SendCommunityInfoFound(community *communities.Community) {
send(EventCommunityInfoFound, community)
}