status-go/protocol/messenger_communities.go
2021-07-30 13:05:44 -04:00

827 lines
22 KiB
Go

package protocol
import (
"context"
"crypto/ecdsa"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
)
func (m *Messenger) publishOrg(org *communities.Community) error {
m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org))
payload, err := org.MarshaledDescription()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Sender: org.PrivateKey(),
// we don't want to wrap in an encryption layer message
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
}
_, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
return err
}
func (m *Messenger) publishOrgInvitation(org *communities.Community, invitation *protobuf.CommunityInvitation) error {
m.logger.Debug("publishing org invitation", zap.String("org-id", org.IDString()), zap.Any("org", org))
pk, err := crypto.DecompressPubkey(invitation.PublicKey)
if err != nil {
return err
}
payload, err := proto.Marshal(invitation)
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Sender: org.PrivateKey(),
// we don't want to wrap in an encryption layer message
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_INVITATION,
}
_, err = m.sender.SendPrivate(context.Background(), pk, &rawMessage)
return err
}
// handleCommunitiesSubscription handles events from communities
func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscription) {
var lastPublished int64
// We check every 5 minutes if we need to publish
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case sub, more := <-c:
if !more {
return
}
if sub.Community != nil {
err := m.publishOrg(sub.Community)
if err != nil {
m.logger.Warn("failed to publish org", zap.Error(err))
}
}
for _, invitation := range sub.Invitations {
err := m.publishOrgInvitation(sub.Community, invitation)
if err != nil {
m.logger.Warn("failed to publish org invitation", zap.Error(err))
}
}
m.logger.Debug("published org")
case <-ticker.C:
// If we are not online, we don't even try
if !m.online() {
continue
}
// If not enough time has passed since last advertisement, we skip this
if time.Now().Unix()-lastPublished < communityAdvertiseIntervalSecond {
continue
}
orgs, err := m.communitiesManager.Created()
if err != nil {
m.logger.Warn("failed to retrieve orgs", zap.Error(err))
}
for idx := range orgs {
org := orgs[idx]
err := m.publishOrg(org)
if err != nil {
m.logger.Warn("failed to publish org", zap.Error(err))
}
}
// set lastPublished
lastPublished = time.Now().Unix()
case <-m.quit:
return
}
}
}()
}
func (m *Messenger) Communities() ([]*communities.Community, error) {
return m.communitiesManager.All()
}
func (m *Messenger) JoinedCommunities() ([]*communities.Community, error) {
return m.communitiesManager.Joined()
}
func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexBytes) (*MessengerResponse, error) {
return m.joinCommunity(ctx, communityID)
}
func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes) (*MessengerResponse, error) {
response := &MessengerResponse{}
community, err := m.communitiesManager.JoinCommunity(communityID)
if err != nil {
return nil, err
}
chatIDs := community.DefaultFilters()
chats := CreateCommunityChats(community, m.getTimesource())
response.AddChats(chats)
for _, chat := range response.Chats() {
chatIDs = append(chatIDs, chat.ID)
}
// Load transport filters
filters, err := m.transport.InitPublicFilters(chatIDs)
if err != nil {
return nil, err
}
willSync, err := m.scheduleSyncFilters(filters)
if err != nil {
return nil, err
}
if !willSync {
defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod()
if err != nil {
return nil, err
}
timestamp := uint32(m.getTimesource().GetCurrentTime()/1000) - defaultSyncPeriod
for idx := range chats {
chats[idx].SyncedTo = timestamp
chats[idx].SyncedFrom = timestamp
}
}
response.AddCommunity(community)
if err = m.saveChats(chats); err != nil {
return nil, err
}
err = m.sendCurrentUserStatusToCommunity(ctx, community)
if err != nil {
return nil, err
}
return response, nil
}
func (m *Messenger) SetMuted(communityID types.HexBytes, muted bool) error {
return m.communitiesManager.SetMuted(communityID, muted)
}
func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
community, requestToJoin, err := m.communitiesManager.RequestToJoin(&m.identity.PublicKey, request)
if err != nil {
return nil, err
}
requestToJoinProto := &protobuf.CommunityRequestToJoin{
Clock: requestToJoin.Clock,
EnsName: requestToJoin.ENSName,
CommunityId: community.ID(),
}
payload, err := proto.Marshal(requestToJoinProto)
if err != nil {
return nil, err
}
rawMessage := common.RawMessage{
Payload: payload,
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
}
_, err = m.sender.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage)
if err != nil {
return nil, err
}
response := &MessengerResponse{RequestsToJoinCommunity: []*communities.RequestToJoin{requestToJoin}}
response.AddCommunity(community)
// We send a push notification in the background
go func() {
if m.pushNotificationClient != nil {
pks, err := community.CanManageUsersPublicKeys()
if err != nil {
m.logger.Error("failed to get pks", zap.Error(err))
return
}
for _, publicKey := range pks {
pkString := common.PubkeyToHex(publicKey)
_, err = m.pushNotificationClient.SendNotification(publicKey, nil, requestToJoin.ID, pkString, protobuf.PushNotification_REQUEST_TO_JOIN_COMMUNITY)
if err != nil {
m.logger.Error("error sending notification", zap.Error(err))
return
}
}
}
}()
return response, nil
}
func (m *Messenger) CreateCommunityCategory(request *requests.CreateCommunityCategory) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
var response MessengerResponse
community, changes, err := m.communitiesManager.CreateCategory(request)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
return &response, nil
}
func (m *Messenger) EditCommunityCategory(request *requests.EditCommunityCategory) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
var response MessengerResponse
community, changes, err := m.communitiesManager.EditCategory(request)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
return &response, nil
}
func (m *Messenger) ReorderCommunityCategories(request *requests.ReorderCommunityCategories) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
var response MessengerResponse
community, changes, err := m.communitiesManager.ReorderCategories(request)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
return &response, nil
}
func (m *Messenger) ReorderCommunityChat(request *requests.ReorderCommunityChat) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
var response MessengerResponse
community, changes, err := m.communitiesManager.ReorderChat(request)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
return &response, nil
}
func (m *Messenger) DeleteCommunityCategory(request *requests.DeleteCommunityCategory) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
var response MessengerResponse
community, changes, err := m.communitiesManager.DeleteCategory(request)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
return &response, nil
}
func (m *Messenger) AcceptRequestToJoinCommunity(request *requests.AcceptRequestToJoinCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
community, err := m.communitiesManager.AcceptRequestToJoin(request)
if err != nil {
return nil, err
}
response := &MessengerResponse{}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) DeclineRequestToJoinCommunity(request *requests.DeclineRequestToJoinCommunity) error {
if err := request.Validate(); err != nil {
return err
}
return m.communitiesManager.DeclineRequestToJoin(request)
}
func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerResponse, error) {
return m.leaveCommunity(communityID)
}
func (m *Messenger) leaveCommunity(communityID types.HexBytes) (*MessengerResponse, error) {
response := &MessengerResponse{}
community, err := m.communitiesManager.LeaveCommunity(communityID)
if err != nil {
return nil, err
}
// Make chat inactive
for chatID := range community.Chats() {
communityChatID := communityID.String() + chatID
err := m.deleteChat(communityChatID)
if err != nil {
return nil, err
}
response.AddRemovedChat(communityChatID)
_, err = m.transport.RemoveFilterByChatID(communityChatID)
if err != nil {
return nil, err
}
}
_, err = m.transport.RemoveFilterByChatID(communityID.String())
if err != nil {
return nil, err
}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.CommunityChat) (*MessengerResponse, error) {
var response MessengerResponse
community, changes, err := m.communitiesManager.CreateChat(communityID, c)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
var chats []*Chat
var chatIDs []string
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
chatIDs = append(chatIDs, c.ID)
response.AddChat(c)
}
// Load filters
filters, err := m.transport.InitPublicFilters(chatIDs)
if err != nil {
return nil, err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return nil, err
}
return &response, m.saveChats(chats)
}
func (m *Messenger) EditCommunityChat(communityID types.HexBytes, chatID string, c *protobuf.CommunityChat) (*MessengerResponse, error) {
var response MessengerResponse
community, changes, err := m.communitiesManager.EditChat(communityID, chatID, c)
if err != nil {
return nil, err
}
response.AddCommunity(community)
response.CommunityChanges = []*communities.CommunityChanges{changes}
var chats []*Chat
var chatIDs []string
for chatID, change := range changes.ChatsModified {
c := CreateCommunityChat(community.IDString(), chatID, change.ChatModified, m.getTimesource())
chats = append(chats, c)
chatIDs = append(chatIDs, c.ID)
response.AddChat(c)
}
// Load filters
filters, err := m.transport.InitPublicFilters(chatIDs)
if err != nil {
return nil, err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return nil, err
}
return &response, m.saveChats(chats)
}
func (m *Messenger) DeleteCommunityChat(communityID types.HexBytes, chatID string) (*MessengerResponse, error) {
response := &MessengerResponse{}
community, _, err := m.communitiesManager.DeleteChat(communityID, chatID)
if err != nil {
return nil, err
}
err = m.deleteChat(chatID)
if err != nil {
return nil, err
}
response.AddRemovedChat(chatID)
_, err = m.transport.RemoveFilterByChatID(chatID)
if err != nil {
return nil, err
}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) CreateCommunity(request *requests.CreateCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
community, err := m.communitiesManager.CreateCommunity(request)
if err != nil {
return nil, err
}
// Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{community.PrivateKey()})
if err != nil {
return nil, err
}
// Init the default community filters
_, err = m.transport.InitPublicFilters(community.DefaultFilters())
if err != nil {
return nil, err
}
response := &MessengerResponse{}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) EditCommunity(request *requests.EditCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
community, err := m.communitiesManager.EditCommunity(request)
if err != nil {
return nil, err
}
response := &MessengerResponse{}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error) {
return m.communitiesManager.ExportCommunity(id)
}
func (m *Messenger) ImportCommunity(ctx context.Context, key *ecdsa.PrivateKey) (*MessengerResponse, error) {
community, err := m.communitiesManager.ImportCommunity(key)
if err != nil {
return nil, err
}
// Load filters
_, err = m.transport.InitPublicFilters(community.DefaultFilters())
if err != nil {
return nil, err
}
//request info already stored on mailserver, but its success is not crucial
// for import
_ = m.RequestCommunityInfoFromMailserver(community.IDString())
// We add ourselves
_, err = m.communitiesManager.InviteUsersToCommunity(community.ID(), []*ecdsa.PublicKey{&m.identity.PublicKey})
if err != nil {
return nil, err
}
return m.JoinCommunity(ctx, community.ID())
}
func (m *Messenger) InviteUsersToCommunity(request *requests.InviteUsersToCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
response := &MessengerResponse{}
var messages []*common.Message
var publicKeys []*ecdsa.PublicKey
community, err := m.communitiesManager.GetByID(request.CommunityID)
if err != nil {
return nil, err
}
for _, pkBytes := range request.Users {
publicKey, err := common.HexToPubkey(pkBytes.String())
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
message := &common.Message{}
message.ChatId = pkBytes.String()
message.CommunityID = request.CommunityID.String()
message.Text = fmt.Sprintf("You have been invited to community %s", community.Name())
messages = append(messages, message)
r, err := m.CreateOneToOneChat(&requests.CreateOneToOneChat{ID: pkBytes})
if err != nil {
return nil, err
}
if err := response.Merge(r); err != nil {
return nil, err
}
}
community, err = m.communitiesManager.InviteUsersToCommunity(request.CommunityID, publicKeys)
if err != nil {
return nil, err
}
sendMessagesResponse, err := m.SendChatMessages(context.Background(), messages)
if err != nil {
return nil, err
}
if err := response.Merge(sendMessagesResponse); err != nil {
return nil, err
}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) ShareCommunity(request *requests.ShareCommunity) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
response := &MessengerResponse{}
community, err := m.communitiesManager.GetByID(request.CommunityID)
if err != nil {
return nil, err
}
var messages []*common.Message
for _, pk := range request.Users {
message := &common.Message{}
message.ChatId = pk.String()
message.CommunityID = request.CommunityID.String()
message.Text = fmt.Sprintf("Community %s has been shared with you", community.Name())
messages = append(messages, message)
r, err := m.CreateOneToOneChat(&requests.CreateOneToOneChat{ID: pk})
if err != nil {
return nil, err
}
if err := response.Merge(r); err != nil {
return nil, err
}
}
sendMessagesResponse, err := m.SendChatMessages(context.Background(), messages)
if err != nil {
return nil, err
}
if err := response.Merge(sendMessagesResponse); err != nil {
return nil, err
}
return response, nil
}
func (m *Messenger) MyPendingRequestsToJoin() ([]*communities.RequestToJoin, error) {
return m.communitiesManager.PendingRequestsToJoinForUser(&m.identity.PublicKey)
}
func (m *Messenger) PendingRequestsToJoinForCommunity(id types.HexBytes) ([]*communities.RequestToJoin, error) {
return m.communitiesManager.PendingRequestsToJoinForCommunity(id)
}
func (m *Messenger) RemoveUserFromCommunity(id types.HexBytes, pkString string) (*MessengerResponse, error) {
publicKey, err := common.HexToPubkey(pkString)
if err != nil {
return nil, err
}
community, err := m.communitiesManager.RemoveUserFromCommunity(id, publicKey)
if err != nil {
return nil, err
}
response := &MessengerResponse{}
response.AddCommunity(community)
return response, nil
}
func (m *Messenger) BanUserFromCommunity(request *requests.BanUserFromCommunity) (*MessengerResponse, error) {
community, err := m.communitiesManager.BanUserFromCommunity(request)
if err != nil {
return nil, err
}
response := &MessengerResponse{}
response.AddCommunity(community)
return response, nil
}
// RequestCommunityInfoFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler
func (m *Messenger) RequestCommunityInfoFromMailserver(communityID string) error {
if _, ok := m.requestedCommunities[communityID]; ok {
return nil
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(communityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]string{communityID})
if err != nil {
return fmt.Errorf("Can't install filter for community: %v", err)
}
if len(filters) != 1 {
return fmt.Errorf("Unexpected amount of filters created")
}
filter = filters[0]
m.requestedCommunities[communityID] = filter
} else {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[communityID] = nil
}
now := uint32(m.transport.GetCurrentTime() / 1000)
monthAgo := now - (86400 * 30)
_, _, err := m.RequestHistoricMessagesForFilter(context.Background(),
monthAgo,
now,
nil,
nil,
filter,
false)
//It is possible that we already processed last existing message for community
//and won't get any updates, so send stored info in this case after timeout
go func() {
time.Sleep(15 * time.Second)
m.mutex.Lock()
defer m.mutex.Unlock()
if _, ok := m.requestedCommunities[communityID]; ok {
m.passStoredCommunityInfoToSignalHandler(communityID)
}
}()
return err
}
// forgetCommunityRequest removes community from requested ones and removes filter
func (m *Messenger) forgetCommunityRequest(communityID string) {
filter, ok := m.requestedCommunities[communityID]
if !ok {
return
}
if filter != nil {
err := m.transport.RemoveFilters([]*transport.Filter{filter})
if err != nil {
m.logger.Warn("cant remove filter", zap.Error(err))
}
}
delete(m.requestedCommunities, communityID)
}
// passStoredCommunityInfoToSignalHandler calls signal handler with community info
func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
if m.config.messengerSignalsHandler == nil {
return
}
//send signal to client that message status updated
community, err := m.communitiesManager.GetByIDString(communityID)
if community == nil {
return
}
//if there is no info helpful for client, we don't post it
if community.Name() == "" && community.Description() == "" && community.MembersCount() == 0 {
return
}
if err != nil {
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
return
}
m.config.messengerSignalsHandler.CommunityInfoFound(community)
m.forgetCommunityRequest(communityID)
}
// handleCommunityDescription handles an community description
func (m *Messenger) handleCommunityDescription(state *ReceivedMessageState, signer *ecdsa.PublicKey, description protobuf.CommunityDescription, rawPayload []byte) error {
communityResponse, err := m.communitiesManager.HandleCommunityDescriptionMessage(signer, &description, rawPayload)
if err != nil {
return err
}
community := communityResponse.Community
state.Response.AddCommunity(community)
state.Response.CommunityChanges = append(state.Response.CommunityChanges, communityResponse.Changes)
// If we haven't joined the org, nothing to do
if !community.Joined() {
return nil
}
// Update relevant chats names and add new ones
// Currently removal is not supported
chats := CreateCommunityChats(community, state.Timesource)
var chatIDs []string
for i, chat := range chats {
oldChat, ok := state.AllChats.Load(chat.ID)
if !ok {
// Beware, don't use the reference in the range (i.e chat) as it's a shallow copy
state.AllChats.Store(chat.ID, chats[i])
state.Response.AddChat(chat)
chatIDs = append(chatIDs, chat.ID)
// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
oldChat.Description != chat.Description {
oldChat.Name = chat.Name
oldChat.Description = chat.Description
// TODO(samyoul) remove storing of an updated reference pointer?
state.AllChats.Store(chat.ID, oldChat)
state.Response.AddChat(chat)
}
}
// Load transport filters
filters, err := m.transport.InitPublicFilters(chatIDs)
if err != nil {
return err
}
_, err = m.scheduleSyncFilters(filters)
if err != nil {
return err
}
return nil
}