mirror of
https://github.com/status-im/status-go.git
synced 2025-02-04 19:04:16 +00:00
631907839a
Previously, we've turned archive support off by default, this means, community member nodes won't try to download torrent data when they receive a magnetlink. This commit ensures that communities joined in the future will have this setting on, and also all existing communities that the node isn't admin of will have this feature enabled.
1403 lines
40 KiB
Go
1403 lines
40 KiB
Go
package protocol
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/communities"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/requests"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
)
|
|
|
|
// 7 days interval
|
|
var messageArchiveInterval = 7 * 24 * time.Hour
|
|
|
|
func (m *Messenger) publishOrg(org *communities.Community) error {
|
|
m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org))
|
|
payload, err := org.MarshaledDescription()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rawMessage := common.RawMessage{
|
|
Payload: payload,
|
|
Sender: org.PrivateKey(),
|
|
// we don't want to wrap in an encryption layer message
|
|
SkipEncryption: true,
|
|
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
|
|
}
|
|
_, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
|
|
return err
|
|
}
|
|
|
|
func (m *Messenger) publishOrgInvitation(org *communities.Community, invitation *protobuf.CommunityInvitation) error {
|
|
m.logger.Debug("publishing org invitation", zap.String("org-id", org.IDString()), zap.Any("org", org))
|
|
pk, err := crypto.DecompressPubkey(invitation.PublicKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
payload, err := proto.Marshal(invitation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rawMessage := common.RawMessage{
|
|
Payload: payload,
|
|
Sender: org.PrivateKey(),
|
|
// we don't want to wrap in an encryption layer message
|
|
SkipEncryption: true,
|
|
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_INVITATION,
|
|
}
|
|
_, err = m.sender.SendPrivate(context.Background(), pk, &rawMessage)
|
|
return err
|
|
}
|
|
|
|
func (m *Messenger) handleCommunitiesHistoryArchivesSubscription(c chan *communities.Subscription) {
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case sub, more := <-c:
|
|
if !more {
|
|
return
|
|
}
|
|
|
|
if sub.CreatingHistoryArchivesSignal != nil {
|
|
m.config.messengerSignalsHandler.CreatingHistoryArchives(sub.CreatingHistoryArchivesSignal.CommunityID)
|
|
}
|
|
|
|
if sub.HistoryArchivesCreatedSignal != nil {
|
|
m.config.messengerSignalsHandler.HistoryArchivesCreated(
|
|
sub.HistoryArchivesCreatedSignal.CommunityID,
|
|
sub.HistoryArchivesCreatedSignal.From,
|
|
sub.HistoryArchivesCreatedSignal.To,
|
|
)
|
|
}
|
|
|
|
if sub.NoHistoryArchivesCreatedSignal != nil {
|
|
m.config.messengerSignalsHandler.NoHistoryArchivesCreated(
|
|
sub.NoHistoryArchivesCreatedSignal.CommunityID,
|
|
sub.NoHistoryArchivesCreatedSignal.From,
|
|
sub.NoHistoryArchivesCreatedSignal.To,
|
|
)
|
|
}
|
|
|
|
if sub.HistoryArchivesSeedingSignal != nil {
|
|
|
|
m.config.messengerSignalsHandler.HistoryArchivesSeeding(sub.HistoryArchivesSeedingSignal.CommunityID)
|
|
|
|
err := m.dispatchMagnetlinkMessage(sub.HistoryArchivesSeedingSignal.CommunityID)
|
|
if err != nil {
|
|
m.logger.Debug("failed to dispatch magnetlink message", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if sub.HistoryArchivesUnseededSignal != nil {
|
|
m.config.messengerSignalsHandler.HistoryArchivesUnseeded(sub.HistoryArchivesUnseededSignal.CommunityID)
|
|
}
|
|
|
|
if sub.HistoryArchiveDownloadedSignal != nil {
|
|
m.config.messengerSignalsHandler.HistoryArchiveDownloaded(
|
|
sub.HistoryArchiveDownloadedSignal.CommunityID,
|
|
sub.HistoryArchiveDownloadedSignal.From,
|
|
sub.HistoryArchiveDownloadedSignal.To,
|
|
)
|
|
}
|
|
case <-m.quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// handleCommunitiesSubscription handles events from communities
|
|
func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscription) {
|
|
|
|
var lastPublished int64
|
|
// We check every 5 minutes if we need to publish
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case sub, more := <-c:
|
|
if !more {
|
|
return
|
|
}
|
|
if sub.Community != nil {
|
|
err := m.publishOrg(sub.Community)
|
|
if err != nil {
|
|
m.logger.Warn("failed to publish org", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
for _, invitation := range sub.Invitations {
|
|
err := m.publishOrgInvitation(sub.Community, invitation)
|
|
if err != nil {
|
|
m.logger.Warn("failed to publish org invitation", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
m.logger.Debug("published org")
|
|
case <-ticker.C:
|
|
// If we are not online, we don't even try
|
|
if !m.online() {
|
|
continue
|
|
}
|
|
|
|
// If not enough time has passed since last advertisement, we skip this
|
|
if time.Now().Unix()-lastPublished < communityAdvertiseIntervalSecond {
|
|
continue
|
|
}
|
|
|
|
orgs, err := m.communitiesManager.Created()
|
|
if err != nil {
|
|
m.logger.Warn("failed to retrieve orgs", zap.Error(err))
|
|
}
|
|
|
|
for idx := range orgs {
|
|
org := orgs[idx]
|
|
err := m.publishOrg(org)
|
|
if err != nil {
|
|
m.logger.Warn("failed to publish org", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// set lastPublished
|
|
lastPublished = time.Now().Unix()
|
|
|
|
case <-m.quit:
|
|
return
|
|
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (m *Messenger) Communities() ([]*communities.Community, error) {
|
|
return m.communitiesManager.All()
|
|
}
|
|
|
|
func (m *Messenger) JoinedCommunities() ([]*communities.Community, error) {
|
|
return m.communitiesManager.Joined()
|
|
}
|
|
|
|
func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexBytes) (*MessengerResponse, error) {
|
|
mr, err := m.joinCommunity(ctx, communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
communitySettings := communities.CommunitySettings{
|
|
CommunityID: communityID.String(),
|
|
HistoryArchiveSupportEnabled: true,
|
|
}
|
|
|
|
err = m.communitiesManager.SaveCommunitySettings(communitySettings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mr.AddCommunitySettings(&communitySettings)
|
|
|
|
if com, ok := mr.communities[communityID.String()]; ok {
|
|
err = m.syncCommunity(context.Background(), com)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return mr, nil
|
|
}
|
|
|
|
func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes) (*MessengerResponse, error) {
|
|
logger := m.logger.Named("joinCommunity")
|
|
|
|
response := &MessengerResponse{}
|
|
|
|
community, err := m.communitiesManager.JoinCommunity(communityID)
|
|
if err != nil {
|
|
logger.Debug("m.communitiesManager.JoinCommunity error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
chatIDs := community.DefaultFilters()
|
|
|
|
chats := CreateCommunityChats(community, m.getTimesource())
|
|
response.AddChats(chats)
|
|
|
|
for _, chat := range response.Chats() {
|
|
chatIDs = append(chatIDs, chat.ID)
|
|
}
|
|
|
|
// Load transport filters
|
|
filters, err := m.transport.InitPublicFilters(chatIDs)
|
|
if err != nil {
|
|
logger.Debug("m.transport.InitPublicFilters error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if community.IsAdmin() {
|
|
// Init the community filter so we can receive messages on the community
|
|
communityFilters, err := m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{community.PrivateKey()})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
filters = append(filters, communityFilters...)
|
|
}
|
|
|
|
willSync, err := m.scheduleSyncFilters(filters)
|
|
if err != nil {
|
|
logger.Debug("m.scheduleSyncFilters error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if !willSync {
|
|
defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod()
|
|
if err != nil {
|
|
logger.Debug("m.settings.GetDefaultSyncPeriod error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
timestamp := uint32(m.getTimesource().GetCurrentTime()/1000) - defaultSyncPeriod
|
|
for idx := range chats {
|
|
chats[idx].SyncedTo = timestamp
|
|
chats[idx].SyncedFrom = timestamp
|
|
}
|
|
}
|
|
|
|
response.AddCommunity(community)
|
|
|
|
if err = m.saveChats(chats); err != nil {
|
|
logger.Debug("m.saveChats error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
err = m.reregisterForPushNotifications()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.sendCurrentUserStatusToCommunity(ctx, community)
|
|
if err != nil {
|
|
logger.Debug("m.sendCurrentUserStatusToCommunity error", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) SetMuted(communityID types.HexBytes, muted bool) error {
|
|
return m.communitiesManager.SetMuted(communityID, muted)
|
|
}
|
|
|
|
func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommunity) (*MessengerResponse, error) {
|
|
logger := m.logger.Named("RequestToJoinCommunity")
|
|
if err := request.Validate(); err != nil {
|
|
logger.Debug("request failed to validate", zap.Error(err), zap.Any("request", request))
|
|
return nil, err
|
|
}
|
|
|
|
community, requestToJoin, err := m.communitiesManager.RequestToJoin(&m.identity.PublicKey, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = m.syncCommunity(context.Background(), community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestToJoinProto := &protobuf.CommunityRequestToJoin{
|
|
Clock: requestToJoin.Clock,
|
|
EnsName: requestToJoin.ENSName,
|
|
CommunityId: community.ID(),
|
|
}
|
|
|
|
payload, err := proto.Marshal(requestToJoinProto)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rawMessage := common.RawMessage{
|
|
Payload: payload,
|
|
SkipEncryption: true,
|
|
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
|
|
}
|
|
_, err = m.sender.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{RequestsToJoinCommunity: []*communities.RequestToJoin{requestToJoin}}
|
|
response.AddCommunity(community)
|
|
|
|
// We send a push notification in the background
|
|
go func() {
|
|
if m.pushNotificationClient != nil {
|
|
pks, err := community.CanManageUsersPublicKeys()
|
|
if err != nil {
|
|
m.logger.Error("failed to get pks", zap.Error(err))
|
|
return
|
|
}
|
|
for _, publicKey := range pks {
|
|
pkString := common.PubkeyToHex(publicKey)
|
|
_, err = m.pushNotificationClient.SendNotification(publicKey, nil, requestToJoin.ID, pkString, protobuf.PushNotification_REQUEST_TO_JOIN_COMMUNITY)
|
|
if err != nil {
|
|
m.logger.Error("error sending notification", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) CreateCommunityCategory(request *requests.CreateCommunityCategory) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.CreateCategory(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) EditCommunityCategory(request *requests.EditCommunityCategory) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.EditCategory(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) ReorderCommunityCategories(request *requests.ReorderCommunityCategories) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.ReorderCategories(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) ReorderCommunityChat(request *requests.ReorderCommunityChat) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.ReorderChat(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) DeleteCommunityCategory(request *requests.DeleteCommunityCategory) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.DeleteCategory(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) AcceptRequestToJoinCommunity(request *requests.AcceptRequestToJoinCommunity) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.communitiesManager.AcceptRequestToJoin(request)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) DeclineRequestToJoinCommunity(request *requests.DeclineRequestToJoinCommunity) error {
|
|
if err := request.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.communitiesManager.DeclineRequestToJoin(request)
|
|
}
|
|
|
|
func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerResponse, error) {
|
|
err := m.persistence.DismissAllActivityCenterNotificationsFromCommunity(communityID.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mr, err := m.leaveCommunity(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.communitiesManager.DeleteCommunitySettings(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.communitiesManager.StopHistoryArchiveTasksInterval(communityID)
|
|
|
|
if com, ok := mr.communities[communityID.String()]; ok {
|
|
err = m.syncCommunity(context.Background(), com)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return mr, nil
|
|
}
|
|
|
|
func (m *Messenger) leaveCommunity(communityID types.HexBytes) (*MessengerResponse, error) {
|
|
response := &MessengerResponse{}
|
|
|
|
community, err := m.communitiesManager.LeaveCommunity(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Make chat inactive
|
|
for chatID := range community.Chats() {
|
|
communityChatID := communityID.String() + chatID
|
|
err := m.deleteChat(communityChatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddRemovedChat(communityChatID)
|
|
|
|
_, err = m.transport.RemoveFilterByChatID(communityChatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
_, err = m.transport.RemoveFilterByChatID(communityID.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.CommunityChat) (*MessengerResponse, error) {
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.CreateChat(communityID, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
var chats []*Chat
|
|
var chatIDs []string
|
|
for chatID, chat := range changes.ChatsAdded {
|
|
c := CreateCommunityChat(community.IDString(), chatID, chat, m.getTimesource())
|
|
chats = append(chats, c)
|
|
chatIDs = append(chatIDs, c.ID)
|
|
response.AddChat(c)
|
|
}
|
|
|
|
// Load filters
|
|
filters, err := m.transport.InitPublicFilters(chatIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = m.scheduleSyncFilters(filters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.saveChats(chats)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.reregisterForPushNotifications()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (m *Messenger) EditCommunityChat(communityID types.HexBytes, chatID string, c *protobuf.CommunityChat) (*MessengerResponse, error) {
|
|
var response MessengerResponse
|
|
community, changes, err := m.communitiesManager.EditChat(communityID, chatID, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddCommunity(community)
|
|
response.CommunityChanges = []*communities.CommunityChanges{changes}
|
|
|
|
var chats []*Chat
|
|
var chatIDs []string
|
|
for chatID, change := range changes.ChatsModified {
|
|
c := CreateCommunityChat(community.IDString(), chatID, change.ChatModified, m.getTimesource())
|
|
chats = append(chats, c)
|
|
chatIDs = append(chatIDs, c.ID)
|
|
response.AddChat(c)
|
|
}
|
|
|
|
// Load filters
|
|
filters, err := m.transport.InitPublicFilters(chatIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = m.scheduleSyncFilters(filters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &response, m.saveChats(chats)
|
|
}
|
|
|
|
func (m *Messenger) DeleteCommunityChat(communityID types.HexBytes, chatID string) (*MessengerResponse, error) {
|
|
response := &MessengerResponse{}
|
|
|
|
community, _, err := m.communitiesManager.DeleteChat(communityID, chatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = m.deleteChat(chatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
response.AddRemovedChat(chatID)
|
|
|
|
_, err = m.transport.RemoveFilterByChatID(chatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) CreateCommunity(request *requests.CreateCommunity) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.communitiesManager.CreateCommunity(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
communitySettings := communities.CommunitySettings{
|
|
CommunityID: community.IDString(),
|
|
HistoryArchiveSupportEnabled: request.HistoryArchiveSupportEnabled,
|
|
}
|
|
err = m.communitiesManager.SaveCommunitySettings(communitySettings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Init the community filter so we can receive messages on the community
|
|
_, err = m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{community.PrivateKey()})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Init the default community filters
|
|
_, err = m.transport.InitPublicFilters(community.DefaultFilters())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
response.AddCommunity(community)
|
|
response.AddCommunitySettings(&communitySettings)
|
|
err = m.syncCommunity(context.Background(), community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && communitySettings.HistoryArchiveSupportEnabled {
|
|
go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) EditCommunity(request *requests.EditCommunity) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.communitiesManager.EditCommunity(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
communitySettings := communities.CommunitySettings{
|
|
CommunityID: community.IDString(),
|
|
HistoryArchiveSupportEnabled: request.HistoryArchiveSupportEnabled,
|
|
}
|
|
err = m.communitiesManager.UpdateCommunitySettings(communitySettings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
id := community.ID()
|
|
|
|
if m.config.torrentConfig.Enabled {
|
|
if !communitySettings.HistoryArchiveSupportEnabled {
|
|
m.communitiesManager.StopHistoryArchiveTasksInterval(id)
|
|
} else if !m.communitiesManager.IsSeedingHistoryArchiveTorrent(id) {
|
|
var communities []*communities.Community
|
|
communities = append(communities, community)
|
|
go m.InitHistoryArchiveTasks(communities)
|
|
}
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
response.AddCommunity(community)
|
|
response.AddCommunitySettings(&communitySettings)
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error) {
|
|
return m.communitiesManager.ExportCommunity(id)
|
|
}
|
|
|
|
func (m *Messenger) ImportCommunity(ctx context.Context, key *ecdsa.PrivateKey) (*MessengerResponse, error) {
|
|
community, err := m.communitiesManager.ImportCommunity(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Load filters
|
|
_, err = m.transport.InitPublicFilters(community.DefaultFilters())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
//request info already stored on mailserver, but its success is not crucial
|
|
// for import
|
|
_, _ = m.RequestCommunityInfoFromMailserver(community.IDString())
|
|
|
|
// We add ourselves
|
|
_, err = m.communitiesManager.InviteUsersToCommunity(community.ID(), []*ecdsa.PublicKey{&m.identity.PublicKey})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response, err := m.JoinCommunity(ctx, community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled {
|
|
var communities []*communities.Community
|
|
communities = append(communities, community)
|
|
go m.InitHistoryArchiveTasks(communities)
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) InviteUsersToCommunity(request *requests.InviteUsersToCommunity) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
var messages []*common.Message
|
|
|
|
var publicKeys []*ecdsa.PublicKey
|
|
community, err := m.communitiesManager.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pkBytes := range request.Users {
|
|
publicKey, err := common.HexToPubkey(pkBytes.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
publicKeys = append(publicKeys, publicKey)
|
|
|
|
message := &common.Message{}
|
|
message.ChatId = pkBytes.String()
|
|
message.CommunityID = request.CommunityID.String()
|
|
message.Text = fmt.Sprintf("You have been invited to community %s", community.Name())
|
|
messages = append(messages, message)
|
|
r, err := m.CreateOneToOneChat(&requests.CreateOneToOneChat{ID: pkBytes})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := response.Merge(r); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
community, err = m.communitiesManager.InviteUsersToCommunity(request.CommunityID, publicKeys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sendMessagesResponse, err := m.SendChatMessages(context.Background(), messages)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := response.Merge(sendMessagesResponse); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) GetCommunityByID(communityID types.HexBytes) (*communities.Community, error) {
|
|
return m.communitiesManager.GetByID(communityID)
|
|
}
|
|
|
|
func (m *Messenger) ShareCommunity(request *requests.ShareCommunity) (*MessengerResponse, error) {
|
|
if err := request.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
response := &MessengerResponse{}
|
|
|
|
community, err := m.communitiesManager.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var messages []*common.Message
|
|
for _, pk := range request.Users {
|
|
message := &common.Message{}
|
|
message.ChatId = pk.String()
|
|
message.CommunityID = request.CommunityID.String()
|
|
message.Text = fmt.Sprintf("Community %s has been shared with you", community.Name())
|
|
messages = append(messages, message)
|
|
r, err := m.CreateOneToOneChat(&requests.CreateOneToOneChat{ID: pk})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := response.Merge(r); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
sendMessagesResponse, err := m.SendChatMessages(context.Background(), messages)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := response.Merge(sendMessagesResponse); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) MyPendingRequestsToJoin() ([]*communities.RequestToJoin, error) {
|
|
return m.communitiesManager.PendingRequestsToJoinForUser(&m.identity.PublicKey)
|
|
}
|
|
|
|
func (m *Messenger) PendingRequestsToJoinForCommunity(id types.HexBytes) ([]*communities.RequestToJoin, error) {
|
|
return m.communitiesManager.PendingRequestsToJoinForCommunity(id)
|
|
}
|
|
|
|
func (m *Messenger) RemoveUserFromCommunity(id types.HexBytes, pkString string) (*MessengerResponse, error) {
|
|
publicKey, err := common.HexToPubkey(pkString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.communitiesManager.RemoveUserFromCommunity(id, publicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) BanUserFromCommunity(request *requests.BanUserFromCommunity) (*MessengerResponse, error) {
|
|
community, err := m.communitiesManager.BanUserFromCommunity(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &MessengerResponse{}
|
|
response, err = m.DeclineAllPendingGroupInvitesFromUser(response, request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.AddCommunity(community)
|
|
return response, nil
|
|
}
|
|
|
|
// RequestCommunityInfoFromMailserver installs filter for community and requests its details
|
|
// from mailserver. It waits until it has the community before returning it
|
|
func (m *Messenger) RequestCommunityInfoFromMailserver(communityID string) (*communities.Community, error) {
|
|
return m.requestCommunityInfoFromMailserver(communityID, true)
|
|
}
|
|
|
|
// RequestCommunityInfoFromMailserverAsync installs filter for community and requests its details
|
|
// from mailserver. When response received it will be passed through signals handler
|
|
func (m *Messenger) RequestCommunityInfoFromMailserverAsync(communityID string) error {
|
|
_, err := m.requestCommunityInfoFromMailserver(communityID, false)
|
|
return err
|
|
}
|
|
|
|
// RequestCommunityInfoFromMailserver installs filter for community and requests its details
|
|
// from mailserver. When response received it will be passed through signals handler
|
|
func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, waitForResponse bool) (*communities.Community, error) {
|
|
if _, ok := m.requestedCommunities[communityID]; ok {
|
|
return nil, nil
|
|
}
|
|
|
|
//If filter wasn't installed we create it and remember for deinstalling after
|
|
//response received
|
|
filter := m.transport.FilterByChatID(communityID)
|
|
if filter == nil {
|
|
filters, err := m.transport.InitPublicFilters([]string{communityID})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Can't install filter for community: %v", err)
|
|
}
|
|
if len(filters) != 1 {
|
|
return nil, fmt.Errorf("Unexpected amount of filters created")
|
|
}
|
|
filter = filters[0]
|
|
m.requestedCommunities[communityID] = filter
|
|
} else {
|
|
//we don't remember filter id associated with community because it was already installed
|
|
m.requestedCommunities[communityID] = nil
|
|
}
|
|
|
|
to := uint32(m.transport.GetCurrentTime() / 1000)
|
|
from := to - oneMonthInSeconds
|
|
|
|
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
|
|
|
|
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.Topic}}
|
|
m.logger.Info("Requesting historic")
|
|
err := m.processMailserverBatch(batch)
|
|
return nil, err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !waitForResponse {
|
|
return nil, nil
|
|
}
|
|
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
defer cancel()
|
|
var community *communities.Community
|
|
|
|
fetching := true
|
|
|
|
for fetching {
|
|
select {
|
|
case <-time.After(200 * time.Millisecond):
|
|
//send signal to client that message status updated
|
|
community, err = m.communitiesManager.GetByIDString(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community != nil && community.Name() != "" && community.DescriptionText() != "" {
|
|
fetching = false
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
fetching = false
|
|
}
|
|
}
|
|
|
|
if community == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
//if there is no info helpful for client, we don't post it
|
|
if community.Name() == "" && community.DescriptionText() == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
m.forgetCommunityRequest(communityID)
|
|
|
|
return community, nil
|
|
}
|
|
|
|
// forgetCommunityRequest removes community from requested ones and removes filter
|
|
func (m *Messenger) forgetCommunityRequest(communityID string) {
|
|
filter, ok := m.requestedCommunities[communityID]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if filter != nil {
|
|
err := m.transport.RemoveFilters([]*transport.Filter{filter})
|
|
if err != nil {
|
|
m.logger.Warn("cant remove filter", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
delete(m.requestedCommunities, communityID)
|
|
}
|
|
|
|
// passStoredCommunityInfoToSignalHandler calls signal handler with community info
|
|
func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
|
|
if m.config.messengerSignalsHandler == nil {
|
|
return
|
|
}
|
|
|
|
//send signal to client that message status updated
|
|
community, err := m.communitiesManager.GetByIDString(communityID)
|
|
if community == nil {
|
|
return
|
|
}
|
|
|
|
//if there is no info helpful for client, we don't post it
|
|
if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 {
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
m.config.messengerSignalsHandler.CommunityInfoFound(community)
|
|
m.forgetCommunityRequest(communityID)
|
|
}
|
|
|
|
// handleCommunityDescription handles an community description
|
|
func (m *Messenger) handleCommunityDescription(state *ReceivedMessageState, signer *ecdsa.PublicKey, description protobuf.CommunityDescription, rawPayload []byte) error {
|
|
communityResponse, err := m.communitiesManager.HandleCommunityDescriptionMessage(signer, &description, rawPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
community := communityResponse.Community
|
|
|
|
state.Response.AddCommunity(community)
|
|
state.Response.CommunityChanges = append(state.Response.CommunityChanges, communityResponse.Changes)
|
|
|
|
// If we haven't joined the org, nothing to do
|
|
if !community.Joined() {
|
|
return nil
|
|
}
|
|
|
|
// Update relevant chats names and add new ones
|
|
// Currently removal is not supported
|
|
chats := CreateCommunityChats(community, state.Timesource)
|
|
var chatIDs []string
|
|
for i, chat := range chats {
|
|
|
|
oldChat, ok := state.AllChats.Load(chat.ID)
|
|
if !ok {
|
|
// Beware, don't use the reference in the range (i.e chat) as it's a shallow copy
|
|
state.AllChats.Store(chat.ID, chats[i])
|
|
|
|
state.Response.AddChat(chat)
|
|
chatIDs = append(chatIDs, chat.ID)
|
|
// Update name, currently is the only field is mutable
|
|
} else if oldChat.Name != chat.Name ||
|
|
oldChat.Description != chat.Description ||
|
|
oldChat.Emoji != chat.Emoji ||
|
|
oldChat.Color != chat.Color {
|
|
oldChat.Name = chat.Name
|
|
oldChat.Description = chat.Description
|
|
oldChat.Emoji = chat.Emoji
|
|
oldChat.Color = chat.Color
|
|
// TODO(samyoul) remove storing of an updated reference pointer?
|
|
state.AllChats.Store(chat.ID, oldChat)
|
|
state.Response.AddChat(chat)
|
|
}
|
|
}
|
|
|
|
// Load transport filters
|
|
filters, err := m.transport.InitPublicFilters(chatIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = m.scheduleSyncFilters(filters)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) handleSyncCommunity(messageState *ReceivedMessageState, syncCommunity protobuf.SyncCommunity) error {
|
|
logger := m.logger.Named("handleSyncCommunity")
|
|
|
|
// Should handle community
|
|
shouldHandle, err := m.communitiesManager.ShouldHandleSyncCommunity(&syncCommunity)
|
|
if err != nil {
|
|
logger.Debug("m.communitiesManager.ShouldHandleSyncCommunity error", zap.Error(err))
|
|
return err
|
|
}
|
|
logger.Debug("ShouldHandleSyncCommunity result", zap.Bool("shouldHandle", shouldHandle))
|
|
if !shouldHandle {
|
|
return nil
|
|
}
|
|
|
|
// Handle any community requests to join.
|
|
// MUST BE HANDLED BEFORE DESCRIPTION!
|
|
pending := false
|
|
for _, rtj := range syncCommunity.RequestsToJoin {
|
|
req := new(communities.RequestToJoin)
|
|
req.InitFromSyncProtobuf(rtj)
|
|
|
|
if req.State == communities.RequestToJoinStatePending {
|
|
pending = true
|
|
}
|
|
|
|
err = m.communitiesManager.SaveRequestToJoin(req)
|
|
if err != nil && err != communities.ErrOldRequestToJoin {
|
|
logger.Debug("m.communitiesManager.SaveRequestToJoin error", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
logger.Debug("community requests to join pending state", zap.Bool("pending", pending))
|
|
|
|
// Don't use the public key of the private key, uncompress the community id
|
|
orgPubKey, err := crypto.DecompressPubkey(syncCommunity.Id)
|
|
if err != nil {
|
|
logger.Debug("crypto.DecompressPubkey error", zap.Error(err))
|
|
return err
|
|
}
|
|
logger.Debug("crypto.DecompressPubkey result", zap.Any("orgPubKey", orgPubKey))
|
|
|
|
var amm protobuf.ApplicationMetadataMessage
|
|
err = proto.Unmarshal(syncCommunity.Description, &amm)
|
|
if err != nil {
|
|
logger.Debug("proto.Unmarshal protobuf.ApplicationMetadataMessage error", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
var cd protobuf.CommunityDescription
|
|
err = proto.Unmarshal(amm.Payload, &cd)
|
|
if err != nil {
|
|
logger.Debug("proto.Unmarshal protobuf.CommunityDescription error", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
err = m.handleCommunityDescription(messageState, orgPubKey, cd, syncCommunity.Description)
|
|
if err != nil {
|
|
logger.Debug("m.handleCommunityDescription error", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// associate private key with community if set
|
|
if syncCommunity.PrivateKey != nil {
|
|
orgPrivKey, err := crypto.ToECDSA(syncCommunity.PrivateKey)
|
|
if err != nil {
|
|
logger.Debug("crypto.ToECDSA", zap.Error(err))
|
|
return err
|
|
}
|
|
err = m.communitiesManager.SetPrivateKey(syncCommunity.Id, orgPrivKey)
|
|
if err != nil {
|
|
logger.Debug("m.communitiesManager.SetPrivateKey", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
// if we are not waiting for approval, join or leave the community
|
|
if !pending {
|
|
var mr *MessengerResponse
|
|
if syncCommunity.Joined {
|
|
mr, err = m.joinCommunity(context.Background(), syncCommunity.Id)
|
|
if err != nil {
|
|
logger.Debug("m.joinCommunity error", zap.Error(err))
|
|
return err
|
|
}
|
|
} else {
|
|
mr, err = m.leaveCommunity(syncCommunity.Id)
|
|
if err != nil {
|
|
logger.Debug("m.leaveCommunity error", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
err = messageState.Response.Merge(mr)
|
|
if err != nil {
|
|
logger.Debug("messageState.Response.Merge error", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
// update the clock value
|
|
err = m.communitiesManager.SetSyncClock(syncCommunity.Id, syncCommunity.Clock)
|
|
if err != nil {
|
|
logger.Debug("m.communitiesManager.SetSyncClock", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) {
|
|
|
|
for _, c := range communities {
|
|
|
|
if c.Joined() {
|
|
settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to get community settings", zap.Error(err))
|
|
continue
|
|
}
|
|
if !settings.HistoryArchiveSupportEnabled {
|
|
continue
|
|
}
|
|
|
|
filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to get community chats filters", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if len(filters) == 0 {
|
|
m.logger.Debug("no filters or chats for this community starting interval", zap.String("id", c.IDString()))
|
|
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
|
|
continue
|
|
}
|
|
|
|
topics := []types.TopicType{}
|
|
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
// First we need to know the timestamp of the latest waku message
|
|
// we've received for this community, so we can request messages we've
|
|
// possibly missed since then
|
|
latestWakuMessageTimestamp, err := m.communitiesManager.GetLatestWakuMessageTimestamp(topics)
|
|
if err != nil {
|
|
m.logger.Debug("failed to get Latest waku message timestamp", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if latestWakuMessageTimestamp == 0 {
|
|
// This means we don't have any waku messages for this community
|
|
// yet, either because no messages were sent in the community so far,
|
|
// or because messages haven't reached this node
|
|
//
|
|
// In this case we default to requesting messages from the store nodes
|
|
// for the past 30 days
|
|
latestWakuMessageTimestamp = uint64(time.Now().AddDate(0, 0, -30).Unix())
|
|
}
|
|
|
|
// Request possibly missed waku messages for community
|
|
_, err = m.syncFiltersFrom(filters, uint32(latestWakuMessageTimestamp))
|
|
if err != nil {
|
|
m.logger.Debug("failed to request missing messages", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
// We figure out the end date of the last created archive and schedule
|
|
// the interval for creating future archives
|
|
// If the last end date is at least `interval` ago, we create an archive immediately first
|
|
lastArchiveEndDateTimestamp, err := m.communitiesManager.GetHistoryArchivePartitionStartTimestamp(c.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to get archive partition start timestamp", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
to := time.Now()
|
|
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
|
|
durationSinceLastArchive := to.Sub(lastArchiveEndDate)
|
|
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// No prior messages to be archived, so we just kick off the archive creation loop
|
|
// for future archives
|
|
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
|
|
} else if durationSinceLastArchive < messageArchiveInterval {
|
|
// Last archive is less than `interval` old, wait until `interval` is complete,
|
|
// then create archive and kick off archive creation loop for future archives
|
|
// Seed current archive in the meantime
|
|
err := m.communitiesManager.SeedHistoryArchiveTorrent(c.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to seed history archive", zap.Error(err))
|
|
}
|
|
timeToNextInterval := messageArchiveInterval - durationSinceLastArchive
|
|
|
|
m.logger.Debug("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
|
|
time.AfterFunc(timeToNextInterval, func() {
|
|
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval)
|
|
if err != nil {
|
|
m.logger.Debug("failed to get create and seed history archive", zap.Error(err))
|
|
}
|
|
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
|
|
})
|
|
} else {
|
|
// Looks like the last archive was generated more than `interval`
|
|
// ago, so lets create a new archive now and then schedule the archive
|
|
// creation loop
|
|
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval)
|
|
if err != nil {
|
|
m.logger.Debug("failed to get create and seed history archive", zap.Error(err))
|
|
}
|
|
|
|
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
|
|
|
|
community, err := m.communitiesManager.GetByIDString(communityID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
magnetLinkMessage := &protobuf.CommunityMessageArchiveMagnetlink{
|
|
Clock: m.getTimesource().GetCurrentTime(),
|
|
MagnetUri: magnetlink,
|
|
}
|
|
|
|
encodedMessage, err := proto.Marshal(magnetLinkMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
chatID := community.MagnetlinkMessageChannelID()
|
|
rawMessage := common.RawMessage{
|
|
LocalChatID: chatID,
|
|
Sender: community.PrivateKey(),
|
|
Payload: encodedMessage,
|
|
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_ARCHIVE_MAGNETLINK,
|
|
SkipGroupMessageWrap: true,
|
|
}
|
|
|
|
_, err = m.sender.SendPublic(context.Background(), chatID, rawMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.communitiesManager.UpdateCommunityDescriptionMagnetlinkMessageClock(community.ID(), magnetLinkMessage.Clock)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return m.communitiesManager.UpdateMagnetlinkMessageClock(community.ID(), magnetLinkMessage.Clock)
|
|
}
|
|
|
|
func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error {
|
|
nodeConfig, err := m.settings.GetNodeConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if nodeConfig.TorrentConfig.Enabled {
|
|
return nil
|
|
}
|
|
|
|
nodeConfig.TorrentConfig.Enabled = true
|
|
err = m.settings.SaveSetting("node-config", nodeConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.config.torrentConfig = &nodeConfig.TorrentConfig
|
|
m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
|
|
err = m.communitiesManager.StartTorrentClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
communities, err := m.communitiesManager.Created()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(communities) > 0 {
|
|
go m.InitHistoryArchiveTasks(communities)
|
|
}
|
|
m.config.messengerSignalsHandler.HistoryArchivesProtocolEnabled()
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error {
|
|
|
|
nodeConfig, err := m.settings.GetNodeConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !nodeConfig.TorrentConfig.Enabled {
|
|
return nil
|
|
}
|
|
|
|
m.communitiesManager.StopTorrentClient()
|
|
|
|
nodeConfig.TorrentConfig.Enabled = false
|
|
err = m.settings.SaveSetting("node-config", nodeConfig)
|
|
m.config.torrentConfig = &nodeConfig.TorrentConfig
|
|
m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.config.messengerSignalsHandler.HistoryArchivesProtocolDisabled()
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) GetCommunitiesSettings() ([]communities.CommunitySettings, error) {
|
|
settings, err := m.communitiesManager.GetCommunitiesSettings()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return settings, nil
|
|
}
|