mirror of
https://github.com/status-im/status-go.git
synced 2025-01-28 07:27:00 +00:00
f31e40264e
This commit refactors the discord import tool such that, instead of loading all data to be imported into memory at once, it will now perform the import on a per file basis. This improves the memory pressure for the node performing the import and seems to increase its performance as well.
2557 lines
70 KiB
Go
2557 lines
70 KiB
Go
package communities
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/anacrolix/torrent"
|
|
"github.com/anacrolix/torrent/bencode"
|
|
"github.com/anacrolix/torrent/metainfo"
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/params"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/encryption"
|
|
"github.com/status-im/status-go/protocol/ens"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/requests"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
"github.com/status-im/status-go/signal"
|
|
)
|
|
|
|
var defaultAnnounceList = [][]string{
|
|
{"udp://tracker.opentrackr.org:1337/announce"},
|
|
{"udp://tracker.openbittorrent.com:6969/announce"},
|
|
}
|
|
var pieceLength = 100 * 1024
|
|
|
|
const maxArchiveSizeInBytes = 30000000
|
|
|
|
var ErrTorrentTimedout = errors.New("torrent has timed out")
|
|
|
|
type Manager struct {
|
|
persistence *Persistence
|
|
encryptor *encryption.Protocol
|
|
ensSubscription chan []*ens.VerificationRecord
|
|
subscriptions []chan *Subscription
|
|
ensVerifier *ens.Verifier
|
|
identity *ecdsa.PrivateKey
|
|
logger *zap.Logger
|
|
stdoutLogger *zap.Logger
|
|
transport *transport.Transport
|
|
quit chan struct{}
|
|
torrentConfig *params.TorrentConfig
|
|
torrentClient *torrent.Client
|
|
historyArchiveTasksWaitGroup sync.WaitGroup
|
|
historyArchiveTasks map[string]chan struct{}
|
|
torrentTasks map[string]metainfo.Hash
|
|
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
|
|
}
|
|
|
|
type HistoryArchiveDownloadTask struct {
|
|
CancelChan chan struct{}
|
|
Waiter sync.WaitGroup
|
|
m sync.RWMutex
|
|
Cancelled bool
|
|
}
|
|
|
|
func (t *HistoryArchiveDownloadTask) IsCancelled() bool {
|
|
t.m.RLock()
|
|
defer t.m.RUnlock()
|
|
return t.Cancelled
|
|
}
|
|
|
|
func (t *HistoryArchiveDownloadTask) Cancel() {
|
|
t.m.Lock()
|
|
defer t.m.Unlock()
|
|
t.Cancelled = true
|
|
close(t.CancelChan)
|
|
}
|
|
|
|
func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
|
|
if identity == nil {
|
|
return nil, errors.New("empty identity")
|
|
}
|
|
|
|
var err error
|
|
if logger == nil {
|
|
if logger, err = zap.NewDevelopment(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to create a logger")
|
|
}
|
|
}
|
|
|
|
stdoutLogger, err := zap.NewDevelopment()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create archive logger")
|
|
}
|
|
|
|
manager := &Manager{
|
|
logger: logger,
|
|
stdoutLogger: stdoutLogger,
|
|
encryptor: encryptor,
|
|
identity: identity,
|
|
quit: make(chan struct{}),
|
|
transport: transport,
|
|
torrentConfig: torrentConfig,
|
|
historyArchiveTasks: make(map[string]chan struct{}),
|
|
torrentTasks: make(map[string]metainfo.Hash),
|
|
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
|
|
persistence: &Persistence{
|
|
logger: logger,
|
|
db: db,
|
|
},
|
|
}
|
|
|
|
if verifier != nil {
|
|
|
|
sub := verifier.Subscribe()
|
|
manager.ensSubscription = sub
|
|
manager.ensVerifier = verifier
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
func (m *Manager) LogStdout(msg string, fields ...zap.Field) {
|
|
m.stdoutLogger.Info(msg, fields...)
|
|
m.logger.Debug(msg, fields...)
|
|
}
|
|
|
|
type archiveMDSlice []*archiveMetadata
|
|
|
|
type archiveMetadata struct {
|
|
hash string
|
|
from uint64
|
|
}
|
|
|
|
func (md archiveMDSlice) Len() int {
|
|
return len(md)
|
|
}
|
|
|
|
func (md archiveMDSlice) Swap(i, j int) {
|
|
md[i], md[j] = md[j], md[i]
|
|
}
|
|
|
|
func (md archiveMDSlice) Less(i, j int) bool {
|
|
return md[i].from > md[j].from
|
|
}
|
|
|
|
type Subscription struct {
|
|
Community *Community
|
|
Invitations []*protobuf.CommunityInvitation
|
|
CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal
|
|
HistoryArchivesCreatedSignal *signal.HistoryArchivesCreatedSignal
|
|
NoHistoryArchivesCreatedSignal *signal.NoHistoryArchivesCreatedSignal
|
|
HistoryArchivesSeedingSignal *signal.HistoryArchivesSeedingSignal
|
|
HistoryArchivesUnseededSignal *signal.HistoryArchivesUnseededSignal
|
|
HistoryArchiveDownloadedSignal *signal.HistoryArchiveDownloadedSignal
|
|
DownloadingHistoryArchivesStartedSignal *signal.DownloadingHistoryArchivesStartedSignal
|
|
DownloadingHistoryArchivesFinishedSignal *signal.DownloadingHistoryArchivesFinishedSignal
|
|
ImportingHistoryArchiveMessagesSignal *signal.ImportingHistoryArchiveMessagesSignal
|
|
}
|
|
|
|
type CommunityResponse struct {
|
|
Community *Community `json:"community"`
|
|
Changes *CommunityChanges `json:"changes"`
|
|
}
|
|
|
|
func (m *Manager) Subscribe() chan *Subscription {
|
|
subscription := make(chan *Subscription, 100)
|
|
m.subscriptions = append(m.subscriptions, subscription)
|
|
return subscription
|
|
}
|
|
|
|
func (m *Manager) Start() error {
|
|
if m.ensVerifier != nil {
|
|
m.runENSVerificationLoop()
|
|
}
|
|
|
|
if m.torrentConfig != nil && m.torrentConfig.Enabled {
|
|
err := m.StartTorrentClient()
|
|
if err != nil {
|
|
m.LogStdout("couldn't start torrent client", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) runENSVerificationLoop() {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-m.quit:
|
|
m.logger.Debug("quitting ens verification loop")
|
|
return
|
|
case records, more := <-m.ensSubscription:
|
|
if !more {
|
|
m.logger.Debug("no more ens records, quitting")
|
|
return
|
|
}
|
|
m.logger.Info("received records", zap.Any("records", records))
|
|
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (m *Manager) Stop() error {
|
|
close(m.quit)
|
|
for _, c := range m.subscriptions {
|
|
close(c)
|
|
}
|
|
m.StopTorrentClient()
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) SetTorrentConfig(config *params.TorrentConfig) {
|
|
m.torrentConfig = config
|
|
}
|
|
|
|
func (m *Manager) StartTorrentClient() error {
|
|
if m.torrentConfig == nil {
|
|
return fmt.Errorf("Can't start torrent client: missing torrentConfig")
|
|
}
|
|
|
|
if m.TorrentClientStarted() {
|
|
return nil
|
|
}
|
|
|
|
config := torrent.NewDefaultClientConfig()
|
|
config.SetListenAddr(":" + fmt.Sprint(m.torrentConfig.Port))
|
|
config.Seed = true
|
|
|
|
config.DataDir = m.torrentConfig.DataDir
|
|
|
|
if _, err := os.Stat(m.torrentConfig.DataDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(m.torrentConfig.DataDir, 0700)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
m.logger.Info("Starting torrent client", zap.Any("port", m.torrentConfig.Port))
|
|
// Instantiating the client will make it bootstrap and listen eagerly,
|
|
// so no go routine is needed here
|
|
client, err := torrent.NewClient(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.torrentClient = client
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StopTorrentClient() []error {
|
|
if m.TorrentClientStarted() {
|
|
m.StopHistoryArchiveTasksIntervals()
|
|
m.logger.Info("Stopping torrent client")
|
|
errs := m.torrentClient.Close()
|
|
if len(errs) > 0 {
|
|
return errs
|
|
}
|
|
m.torrentClient = nil
|
|
}
|
|
return make([]error, 0)
|
|
}
|
|
|
|
func (m *Manager) TorrentClientStarted() bool {
|
|
return m.torrentClient != nil
|
|
}
|
|
|
|
func (m *Manager) publish(subscription *Subscription) {
|
|
for _, s := range m.subscriptions {
|
|
select {
|
|
case s <- subscription:
|
|
default:
|
|
m.logger.Warn("subscription channel full, dropping message")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) All() ([]*Community, error) {
|
|
return m.persistence.AllCommunities(&m.identity.PublicKey)
|
|
}
|
|
|
|
type KnownCommunitiesResponse struct {
|
|
ContractCommunities []string `json:"contractCommunities"`
|
|
Descriptions map[string]*Community `json:"communities"`
|
|
UnknownCommunities []string `json:"unknownCommunities"`
|
|
}
|
|
|
|
func (m *Manager) GetStoredDescriptionForCommunities(communityIDs []types.HexBytes) (response *KnownCommunitiesResponse, err error) {
|
|
response = &KnownCommunitiesResponse{
|
|
Descriptions: make(map[string]*Community),
|
|
}
|
|
|
|
for i := range communityIDs {
|
|
communityID := communityIDs[i].String()
|
|
var community *Community
|
|
community, err = m.GetByID(communityIDs[i])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
response.ContractCommunities = append(response.ContractCommunities, communityID)
|
|
|
|
if community != nil {
|
|
response.Descriptions[community.IDString()] = community
|
|
} else {
|
|
response.UnknownCommunities = append(response.UnknownCommunities, communityID)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (m *Manager) Joined() ([]*Community, error) {
|
|
return m.persistence.JoinedCommunities(&m.identity.PublicKey)
|
|
}
|
|
|
|
func (m *Manager) Spectated() ([]*Community, error) {
|
|
return m.persistence.SpectatedCommunities(&m.identity.PublicKey)
|
|
}
|
|
|
|
func (m *Manager) JoinedAndPendingCommunitiesWithRequests() ([]*Community, error) {
|
|
return m.persistence.JoinedAndPendingCommunitiesWithRequests(&m.identity.PublicKey)
|
|
}
|
|
|
|
func (m *Manager) DeletedCommunities() ([]*Community, error) {
|
|
return m.persistence.DeletedCommunities(&m.identity.PublicKey)
|
|
}
|
|
|
|
func (m *Manager) Created() ([]*Community, error) {
|
|
return m.persistence.CreatedCommunities(&m.identity.PublicKey)
|
|
}
|
|
|
|
// CreateCommunity takes a description, generates an ID for it, saves it and return it
|
|
func (m *Manager) CreateCommunity(request *requests.CreateCommunity, publish bool) (*Community, error) {
|
|
|
|
description, err := request.ToCommunityDescription()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description.Members = make(map[string]*protobuf.CommunityMember)
|
|
description.Members[common.PubkeyToHex(&m.identity.PublicKey)] = &protobuf.CommunityMember{Roles: []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL}}
|
|
|
|
err = ValidateCommunityDescription(description)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description.Clock = 1
|
|
|
|
key, err := crypto.GenerateKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config := Config{
|
|
ID: &key.PublicKey,
|
|
PrivateKey: key,
|
|
Logger: m.logger,
|
|
Joined: true,
|
|
MemberIdentity: &m.identity.PublicKey,
|
|
CommunityDescription: description,
|
|
}
|
|
community, err := New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We join any community we create
|
|
community.Join()
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if publish {
|
|
m.publish(&Subscription{Community: community})
|
|
}
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) DeleteCommunity(id types.HexBytes) error {
|
|
err := m.persistence.DeleteCommunity(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return m.persistence.DeleteCommunitySettings(id)
|
|
}
|
|
|
|
// EditCommunity takes a description, updates the community with the description,
|
|
// saves it and returns it
|
|
func (m *Manager) EditCommunity(request *requests.EditCommunity) (*Community, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
if !community.IsAdmin() {
|
|
return nil, errors.New("not an admin")
|
|
}
|
|
|
|
newDescription, err := request.ToCommunityDescription()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Can't create community description: %v", err)
|
|
}
|
|
|
|
// If permissions weren't explicitly set on original request, use existing ones
|
|
if newDescription.Permissions.Access == protobuf.CommunityPermissions_UNKNOWN_ACCESS {
|
|
newDescription.Permissions.Access = community.config.CommunityDescription.Permissions.Access
|
|
}
|
|
// Use existing images for the entries that were not updated
|
|
// NOTE: This will NOT allow deletion of the community image; it will need to
|
|
// be handled separately.
|
|
for imageName := range community.config.CommunityDescription.Identity.Images {
|
|
_, exists := newDescription.Identity.Images[imageName]
|
|
if !exists {
|
|
// If no image was set in ToCommunityDescription then Images is nil.
|
|
if newDescription.Identity.Images == nil {
|
|
newDescription.Identity.Images = make(map[string]*protobuf.IdentityImage)
|
|
}
|
|
newDescription.Identity.Images[imageName] = community.config.CommunityDescription.Identity.Images[imageName]
|
|
}
|
|
}
|
|
// TODO: handle delete image (if needed)
|
|
|
|
err = ValidateCommunityDescription(newDescription)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Edit the community values
|
|
community.Edit(newDescription)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !community.IsAdmin() {
|
|
return nil, errors.New("not an admin")
|
|
}
|
|
|
|
return community.config.PrivateKey, nil
|
|
}
|
|
|
|
func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
|
|
communityID := crypto.CompressPubkey(&key.PublicKey)
|
|
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community == nil {
|
|
description := &protobuf.CommunityDescription{
|
|
Permissions: &protobuf.CommunityPermissions{},
|
|
}
|
|
|
|
config := Config{
|
|
ID: &key.PublicKey,
|
|
PrivateKey: key,
|
|
Logger: m.logger,
|
|
Joined: true,
|
|
MemberIdentity: &m.identity.PublicKey,
|
|
CommunityDescription: description,
|
|
}
|
|
community, err = New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
community.config.PrivateKey = key
|
|
}
|
|
|
|
community.Join()
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool, thirdPartyID string) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
chatID := uuid.New().String()
|
|
if thirdPartyID != "" {
|
|
chatID = chatID + thirdPartyID
|
|
}
|
|
|
|
changes, err := community.CreateChat(chatID, chat)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
if publish {
|
|
m.publish(&Subscription{Community: community})
|
|
}
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) EditChat(communityID types.HexBytes, chatID string, chat *protobuf.CommunityChat) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(chatID, communityID.String()) {
|
|
chatID = strings.TrimPrefix(chatID, communityID.String())
|
|
}
|
|
|
|
changes, err := community.EditChat(chatID, chat)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) DeleteChat(communityID types.HexBytes, chatID string) (*Community, *protobuf.CommunityDescription, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(chatID, communityID.String()) {
|
|
chatID = strings.TrimPrefix(chatID, communityID.String())
|
|
}
|
|
description, err := community.DeleteChat(chatID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, description, nil
|
|
}
|
|
|
|
func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory, publish bool) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
categoryID := uuid.New().String()
|
|
if request.ThirdPartyID != "" {
|
|
categoryID = categoryID + request.ThirdPartyID
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
for i, cid := range request.ChatIDs {
|
|
if strings.HasPrefix(cid, request.CommunityID.String()) {
|
|
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
|
|
}
|
|
}
|
|
|
|
changes, err := community.CreateCategory(categoryID, request.CategoryName, request.ChatIDs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
if publish {
|
|
m.publish(&Subscription{Community: community})
|
|
}
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) EditCategory(request *requests.EditCommunityCategory) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
for i, cid := range request.ChatIDs {
|
|
if strings.HasPrefix(cid, request.CommunityID.String()) {
|
|
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
|
|
}
|
|
}
|
|
|
|
changes, err := community.EditCategory(request.CategoryID, request.CategoryName, request.ChatIDs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) EditChatFirstMessageTimestamp(communityID types.HexBytes, chatID string, timestamp uint32) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(chatID, communityID.String()) {
|
|
chatID = strings.TrimPrefix(chatID, communityID.String())
|
|
}
|
|
|
|
changes, err := community.UpdateChatFirstMessageTimestamp(chatID, timestamp)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) ReorderCategories(request *requests.ReorderCommunityCategories) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
changes, err := community.ReorderCategories(request.CategoryID, request.Position)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) ReorderChat(request *requests.ReorderCommunityChat) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(request.ChatID, request.CommunityID.String()) {
|
|
request.ChatID = strings.TrimPrefix(request.ChatID, request.CommunityID.String())
|
|
}
|
|
|
|
changes, err := community.ReorderChat(request.CategoryID, request.ChatID, request.Position)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) DeleteCategory(request *requests.DeleteCommunityCategory) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
changes, err := community.DeleteCategory(request.CategoryID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte) (*CommunityResponse, error) {
|
|
id := crypto.CompressPubkey(signer)
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community == nil {
|
|
config := Config{
|
|
CommunityDescription: description,
|
|
Logger: m.logger,
|
|
MarshaledCommunityDescription: payload,
|
|
MemberIdentity: &m.identity.PublicKey,
|
|
ID: signer,
|
|
}
|
|
|
|
community, err = New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
changes, err := community.UpdateCommunityDescription(signer, description, payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
hasCommunityArchiveInfo, err := m.persistence.HasCommunityArchiveInfo(community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cdMagnetlinkClock := community.config.CommunityDescription.ArchiveMagnetlinkClock
|
|
if !hasCommunityArchiveInfo {
|
|
err = m.persistence.SaveCommunityArchiveInfo(community.ID(), cdMagnetlinkClock, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
magnetlinkClock, err := m.persistence.GetMagnetlinkMessageClock(community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cdMagnetlinkClock > magnetlinkClock {
|
|
err = m.persistence.UpdateMagnetlinkMessageClock(community.ID(), cdMagnetlinkClock)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
pkString := common.PubkeyToHex(&m.identity.PublicKey)
|
|
|
|
// If the community require membership, we set whether we should leave/join the community after a state change
|
|
if community.InvitationOnly() || community.OnRequest() || community.AcceptRequestToJoinAutomatically() {
|
|
if changes.HasNewMember(pkString) {
|
|
hasPendingRequest, err := m.persistence.HasPendingRequestsToJoinForUserAndCommunity(pkString, changes.Community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If there's any pending request, we should join the community
|
|
// automatically
|
|
changes.ShouldMemberJoin = hasPendingRequest
|
|
}
|
|
|
|
if changes.HasMemberLeft(pkString) {
|
|
// If we joined previously the community, we should leave it
|
|
changes.ShouldMemberLeave = community.Joined()
|
|
}
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We mark our requests as completed, though maybe we should mark
|
|
// any request for any user that has been added as completed
|
|
if err := m.markRequestToJoin(&m.identity.PublicKey, community); err != nil {
|
|
return nil, err
|
|
}
|
|
// Check if there's a change and we should be joining
|
|
|
|
return &CommunityResponse{
|
|
Community: community,
|
|
Changes: changes,
|
|
}, nil
|
|
}
|
|
|
|
// TODO: This is not fully implemented, we want to save the grant passed at
|
|
// this stage and make sure it's used when publishing.
|
|
func (m *Manager) HandleCommunityInvitation(signer *ecdsa.PublicKey, invitation *protobuf.CommunityInvitation, payload []byte) (*CommunityResponse, error) {
|
|
m.logger.Debug("Handling wrapped community description message")
|
|
|
|
community, err := m.HandleWrappedCommunityDescriptionMessage(payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Save grant
|
|
|
|
return community, nil
|
|
}
|
|
|
|
// markRequestToJoin marks all the pending requests to join as completed
|
|
// if we are members
|
|
func (m *Manager) markRequestToJoin(pk *ecdsa.PublicKey, community *Community) error {
|
|
if community.HasMember(pk) {
|
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(pk), community.ID(), RequestToJoinStateAccepted)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) markRequestToJoinAsCanceled(pk *ecdsa.PublicKey, community *Community) error {
|
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(pk), community.ID(), RequestToJoinStateCanceled)
|
|
}
|
|
|
|
func (m *Manager) SetMuted(id types.HexBytes, muted bool) error {
|
|
return m.persistence.SetMuted(id, muted)
|
|
}
|
|
|
|
func (m *Manager) CancelRequestToJoin(request *requests.CancelRequestToJoinCommunity) (*RequestToJoin, *Community, error) {
|
|
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(dbRequest.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
pk, err := common.HexToPubkey(dbRequest.PublicKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if err := m.markRequestToJoinAsCanceled(pk, community); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return dbRequest, community, nil
|
|
}
|
|
|
|
func (m *Manager) AcceptRequestToJoin(request *requests.AcceptRequestToJoinCommunity) (*Community, error) {
|
|
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(dbRequest.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pk, err := common.HexToPubkey(dbRequest.PublicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = community.AddMember(pk, []protobuf.CommunityMember_Roles{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := m.markRequestToJoin(pk, community); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetRequestToJoin(ID types.HexBytes) (*RequestToJoin, error) {
|
|
return m.persistence.GetRequestToJoin(ID)
|
|
}
|
|
|
|
func (m *Manager) DeclineRequestToJoin(request *requests.DeclineRequestToJoinCommunity) error {
|
|
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.persistence.SetRequestToJoinState(dbRequest.PublicKey, dbRequest.CommunityID, RequestToJoinStateDeclined)
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityCancelRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityCancelRequestToJoin) (*RequestToJoin, error) {
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
err = m.markRequestToJoinAsCanceled(signer, community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestToJoin, err := m.persistence.GetRequestToJoinByPk(common.PubkeyToHex(signer), community.ID(), RequestToJoinStateCanceled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return requestToJoin, nil
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoin) (*RequestToJoin, error) {
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
if err := community.ValidateRequestToJoin(signer, request); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestToJoin := &RequestToJoin{
|
|
PublicKey: common.PubkeyToHex(signer),
|
|
Clock: request.Clock,
|
|
ENSName: request.EnsName,
|
|
CommunityID: request.CommunityId,
|
|
State: RequestToJoinStatePending,
|
|
}
|
|
|
|
requestToJoin.CalculateID()
|
|
|
|
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If user is already a member, then accept request automatically
|
|
// It may happen when member removes itself from community and then tries to rejoin
|
|
// More specifically, CommunityRequestToLeave may be delivered later than CommunityRequestToJoin, or not delivered at all
|
|
acceptAutomatically := community.AcceptRequestToJoinAutomatically() || community.HasMember(signer)
|
|
if acceptAutomatically {
|
|
err = m.markRequestToJoin(signer, community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
requestToJoin.State = RequestToJoinStateAccepted
|
|
}
|
|
|
|
return requestToJoin, nil
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoinResponse) error {
|
|
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if community == nil {
|
|
return ErrOrgNotFound
|
|
}
|
|
|
|
communityDescriptionBytes, err := proto.Marshal(request.Community)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We need to wrap `request.Community` in an `ApplicationMetadataMessage`
|
|
// of type `CommunityDescription` because `UpdateCommunityDescription` expects this.
|
|
//
|
|
// This is merely for marsheling/unmarsheling, hence we attaching a `Signature`
|
|
// is not needed.
|
|
metadataMessage := &protobuf.ApplicationMetadataMessage{
|
|
Payload: communityDescriptionBytes,
|
|
Type: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
|
|
}
|
|
|
|
appMetadataMsg, err := proto.Marshal(metadataMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = community.UpdateCommunityDescription(signer, request.Community, appMetadataMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if request.Accepted {
|
|
return m.markRequestToJoin(&m.identity.PublicKey, community)
|
|
}
|
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(&m.identity.PublicKey), community.ID(), RequestToJoinStateDeclined)
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityRequestToLeave(signer *ecdsa.PublicKey, proto *protobuf.CommunityRequestToLeave) error {
|
|
requestToLeave := NewRequestToLeave(common.PubkeyToHex(signer), proto)
|
|
if err := m.persistence.SaveRequestToLeave(requestToLeave); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure corresponding requestToJoin clock is older than requestToLeave
|
|
requestToJoin, err := m.persistence.GetRequestToJoin(requestToLeave.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if requestToJoin.Clock > requestToLeave.Clock {
|
|
return ErrOldRequestToLeave
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) HandleWrappedCommunityDescriptionMessage(payload []byte) (*CommunityResponse, error) {
|
|
m.logger.Debug("Handling wrapped community description message")
|
|
|
|
applicationMetadataMessage := &protobuf.ApplicationMetadataMessage{}
|
|
err := proto.Unmarshal(payload, applicationMetadataMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
|
|
return nil, ErrInvalidMessage
|
|
}
|
|
signer, err := applicationMetadataMessage.RecoverKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description := &protobuf.CommunityDescription{}
|
|
|
|
err = proto.Unmarshal(applicationMetadataMessage.Payload, description)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return m.HandleCommunityDescriptionMessage(signer, description, payload)
|
|
}
|
|
|
|
func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
community.Join()
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) SpectateCommunity(id types.HexBytes) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
community.Spectate()
|
|
if err = m.persistence.SaveCommunity(community); err != nil {
|
|
return nil, err
|
|
}
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
|
|
return m.persistence.GetMagnetlinkMessageClock(communityID)
|
|
}
|
|
|
|
func (m *Manager) GetRequestToJoinIDByPkAndCommunityID(pk *ecdsa.PublicKey, communityID []byte) ([]byte, error) {
|
|
return m.persistence.GetRequestToJoinIDByPkAndCommunityID(common.PubkeyToHex(pk), communityID)
|
|
}
|
|
|
|
func (m *Manager) UpdateCommunityDescriptionMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
|
|
community, err := m.GetByIDString(communityID.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
community.config.CommunityDescription.ArchiveMagnetlinkClock = clock
|
|
return m.persistence.SaveCommunity(community)
|
|
}
|
|
|
|
func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
|
|
return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock)
|
|
}
|
|
|
|
func (m *Manager) UpdateLastSeenMagnetlink(communityID types.HexBytes, magnetlinkURI string) error {
|
|
return m.persistence.UpdateLastSeenMagnetlink(communityID, magnetlinkURI)
|
|
}
|
|
|
|
func (m *Manager) GetLastSeenMagnetlink(communityID types.HexBytes) (string, error) {
|
|
return m.persistence.GetLastSeenMagnetlink(communityID)
|
|
}
|
|
|
|
func (m *Manager) LeaveCommunity(id types.HexBytes) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
community.RemoveOurselvesFromOrg(&m.identity.PublicKey)
|
|
community.Leave()
|
|
|
|
if err = m.persistence.SaveCommunity(community); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) inviteUsersToCommunity(community *Community, pks []*ecdsa.PublicKey) (*Community, error) {
|
|
var invitations []*protobuf.CommunityInvitation
|
|
for _, pk := range pks {
|
|
invitation, err := community.InviteUserToOrg(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// We mark the user request (if any) as completed
|
|
if err := m.markRequestToJoin(pk, community); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
invitations = append(invitations, invitation)
|
|
}
|
|
|
|
err := m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community, Invitations: invitations})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) InviteUsersToCommunity(communityID types.HexBytes, pks []*ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
return m.inviteUsersToCommunity(community, pks)
|
|
}
|
|
|
|
func (m *Manager) AddMemberOwnerToCommunity(communityID types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
err = community.AddMember(pk, []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) RemoveUserFromCommunity(id types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.RemoveUserFromOrg(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) UnbanUserFromCommunity(request *requests.UnbanUserFromCommunity) (*Community, error) {
|
|
id := request.CommunityID
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.UnbanUserFromCommunity(publicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) AddRoleToMember(request *requests.AddRoleToMember) (*Community, error) {
|
|
id := request.CommunityID
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
if !community.hasMember(publicKey) {
|
|
return nil, ErrMemberNotFound
|
|
}
|
|
|
|
_, err = community.AddRoleToMember(publicKey, request.Role)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) RemoveRoleFromMember(request *requests.RemoveRoleFromMember) (*Community, error) {
|
|
id := request.CommunityID
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
if !community.hasMember(publicKey) {
|
|
return nil, ErrMemberNotFound
|
|
}
|
|
|
|
_, err = community.RemoveRoleFromMember(publicKey, request.Role)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) BanUserFromCommunity(request *requests.BanUserFromCommunity) (*Community, error) {
|
|
id := request.CommunityID
|
|
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.BanUserFromCommunity(publicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetByID(id []byte) (*Community, error) {
|
|
return m.persistence.GetByID(&m.identity.PublicKey, id)
|
|
}
|
|
|
|
func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
|
id, err := types.DecodeHex(idString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return m.GetByID(id)
|
|
}
|
|
|
|
func (m *Manager) RequestToJoin(requester *ecdsa.PublicKey, request *requests.RequestToJoinCommunity) (*Community, *RequestToJoin, error) {
|
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// We don't allow requesting access if already joined
|
|
if community.Joined() {
|
|
return nil, nil, ErrAlreadyJoined
|
|
}
|
|
|
|
clock := uint64(time.Now().Unix())
|
|
requestToJoin := &RequestToJoin{
|
|
PublicKey: common.PubkeyToHex(requester),
|
|
Clock: clock,
|
|
ENSName: request.ENSName,
|
|
CommunityID: request.CommunityID,
|
|
State: RequestToJoinStatePending,
|
|
Our: true,
|
|
}
|
|
|
|
requestToJoin.CalculateID()
|
|
|
|
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
community.config.RequestedToJoinAt = uint64(time.Now().Unix())
|
|
community.AddRequestToJoin(requestToJoin)
|
|
|
|
return community, requestToJoin, nil
|
|
}
|
|
|
|
func (m *Manager) SaveRequestToJoin(request *RequestToJoin) error {
|
|
return m.persistence.SaveRequestToJoin(request)
|
|
}
|
|
|
|
func (m *Manager) CanceledRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*RequestToJoin, error) {
|
|
return m.persistence.CanceledRequestsToJoinForUser(common.PubkeyToHex(pk))
|
|
}
|
|
|
|
func (m *Manager) PendingRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*RequestToJoin, error) {
|
|
return m.persistence.PendingRequestsToJoinForUser(common.PubkeyToHex(pk))
|
|
}
|
|
|
|
func (m *Manager) PendingRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
|
|
m.logger.Info("fetching pending invitations", zap.String("community-id", id.String()))
|
|
return m.persistence.PendingRequestsToJoinForCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) DeclinedRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
|
|
m.logger.Info("fetching declined invitations", zap.String("community-id", id.String()))
|
|
return m.persistence.DeclinedRequestsToJoinForCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) CanceledRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
|
|
m.logger.Info("fetching canceled invitations", zap.String("community-id", id.String()))
|
|
return m.persistence.CanceledRequestsToJoinForCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) CanPost(pk *ecdsa.PublicKey, communityID string, chatID string, grant []byte) (bool, error) {
|
|
community, err := m.GetByIDString(communityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if community == nil {
|
|
return false, nil
|
|
}
|
|
return community.CanPost(pk, chatID, grant)
|
|
}
|
|
|
|
func (m *Manager) IsEncrypted(communityID string) (bool, error) {
|
|
community, err := m.GetByIDString(communityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return community.Encrypted(), nil
|
|
|
|
}
|
|
func (m *Manager) ShouldHandleSyncCommunity(community *protobuf.SyncCommunity) (bool, error) {
|
|
return m.persistence.ShouldHandleSyncCommunity(community)
|
|
}
|
|
|
|
func (m *Manager) ShouldHandleSyncCommunitySettings(communitySettings *protobuf.SyncCommunitySettings) (bool, error) {
|
|
return m.persistence.ShouldHandleSyncCommunitySettings(communitySettings)
|
|
}
|
|
|
|
func (m *Manager) HandleSyncCommunitySettings(syncCommunitySettings *protobuf.SyncCommunitySettings) (*CommunitySettings, error) {
|
|
id, err := types.DecodeHex(syncCommunitySettings.CommunityId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
settings, err := m.persistence.GetCommunitySettingsByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if settings == nil {
|
|
settings = &CommunitySettings{
|
|
CommunityID: syncCommunitySettings.CommunityId,
|
|
HistoryArchiveSupportEnabled: syncCommunitySettings.HistoryArchiveSupportEnabled,
|
|
Clock: syncCommunitySettings.Clock,
|
|
}
|
|
}
|
|
|
|
if syncCommunitySettings.Clock > settings.Clock {
|
|
settings.CommunityID = syncCommunitySettings.CommunityId
|
|
settings.HistoryArchiveSupportEnabled = syncCommunitySettings.HistoryArchiveSupportEnabled
|
|
settings.Clock = syncCommunitySettings.Clock
|
|
}
|
|
|
|
err = m.persistence.SaveCommunitySettings(*settings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return settings, nil
|
|
}
|
|
|
|
func (m *Manager) SetSyncClock(id []byte, clock uint64) error {
|
|
return m.persistence.SetSyncClock(id, clock)
|
|
}
|
|
|
|
func (m *Manager) SetPrivateKey(id []byte, privKey *ecdsa.PrivateKey) error {
|
|
return m.persistence.SetPrivateKey(id, privKey)
|
|
}
|
|
|
|
func (m *Manager) GetSyncedRawCommunity(id []byte) (*RawCommunityRow, error) {
|
|
return m.persistence.getSyncedRawCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) GetCommunitySettingsByID(id types.HexBytes) (*CommunitySettings, error) {
|
|
return m.persistence.GetCommunitySettingsByID(id)
|
|
}
|
|
|
|
func (m *Manager) GetCommunitiesSettings() ([]CommunitySettings, error) {
|
|
return m.persistence.GetCommunitiesSettings()
|
|
}
|
|
|
|
func (m *Manager) SaveCommunitySettings(settings CommunitySettings) error {
|
|
return m.persistence.SaveCommunitySettings(settings)
|
|
}
|
|
|
|
func (m *Manager) CommunitySettingsExist(id types.HexBytes) (bool, error) {
|
|
return m.persistence.CommunitySettingsExist(id)
|
|
}
|
|
|
|
func (m *Manager) DeleteCommunitySettings(id types.HexBytes) error {
|
|
return m.persistence.DeleteCommunitySettings(id)
|
|
}
|
|
|
|
func (m *Manager) UpdateCommunitySettings(settings CommunitySettings) error {
|
|
return m.persistence.UpdateCommunitySettings(settings)
|
|
}
|
|
|
|
func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) {
|
|
adminCommunities, err := m.Created()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chatIDs := make(map[string]bool)
|
|
for _, c := range adminCommunities {
|
|
if c.Joined() {
|
|
for _, id := range c.ChatIDs() {
|
|
chatIDs[id] = true
|
|
}
|
|
}
|
|
}
|
|
return chatIDs, nil
|
|
}
|
|
|
|
func (m *Manager) IsAdminCommunityByID(communityID types.HexBytes) (bool, error) {
|
|
pubKey, err := crypto.DecompressPubkey(communityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return m.IsAdminCommunity(pubKey)
|
|
}
|
|
|
|
func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
|
|
adminCommunities, err := m.Created()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, c := range adminCommunities {
|
|
if c.PrivateKey().PublicKey.Equal(pubKey) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
|
|
community, err := m.GetByID(crypto.CompressPubkey(pubKey))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return community != nil && community.Joined(), nil
|
|
}
|
|
|
|
func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
|
|
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filters := []*transport.Filter{}
|
|
for _, cid := range chatIDs {
|
|
filters = append(filters, m.transport.FilterByChatID(cid))
|
|
}
|
|
return filters, nil
|
|
}
|
|
|
|
func (m *Manager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.TopicType, error) {
|
|
filters, err := m.GetCommunityChatsFilters(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topics := []types.TopicType{}
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
func (m *Manager) StoreWakuMessage(message *types.Message) error {
|
|
return m.persistence.SaveWakuMessage(message)
|
|
}
|
|
|
|
func (m *Manager) StoreWakuMessages(messages []*types.Message) error {
|
|
return m.persistence.SaveWakuMessages(messages)
|
|
}
|
|
|
|
func (m *Manager) GetLatestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
|
|
return m.persistence.GetLatestWakuMessageTimestamp(topics)
|
|
}
|
|
|
|
func (m *Manager) GetOldestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
|
|
return m.persistence.GetOldestWakuMessageTimestamp(topics)
|
|
}
|
|
|
|
func (m *Manager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint64, error) {
|
|
return m.persistence.GetLastMessageArchiveEndDate(communityID)
|
|
}
|
|
|
|
func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
|
|
filters, err := m.GetCommunityChatsFilters(communityID)
|
|
if err != nil {
|
|
m.LogStdout("failed to get community chats filters", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
|
|
if len(filters) == 0 {
|
|
// If we don't have chat filters, we likely don't have any chats
|
|
// associated to this community, which means there's nothing more
|
|
// to do here
|
|
return 0, nil
|
|
}
|
|
|
|
topics := []types.TopicType{}
|
|
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
|
|
if err != nil {
|
|
m.LogStdout("failed to get last archive end date", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// If we don't have a tracked last message archive end date, it
|
|
// means we haven't created an archive before, which means
|
|
// the next thing to look at is the oldest waku message timestamp for
|
|
// this community
|
|
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
|
|
if err != nil {
|
|
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// This means there's no waku message stored for this community so far
|
|
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
|
|
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
|
|
return 0, nil
|
|
}
|
|
}
|
|
|
|
return lastArchiveEndDateTimestamp, nil
|
|
}
|
|
|
|
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
|
|
m.UnseedHistoryArchiveTorrent(communityID)
|
|
_, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return m.SeedHistoryArchiveTorrent(communityID)
|
|
}
|
|
|
|
func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
|
|
id := community.IDString()
|
|
_, exists := m.historyArchiveTasks[id]
|
|
|
|
if exists {
|
|
m.LogStdout("history archive tasks interval already in progres", zap.String("id", id))
|
|
return
|
|
}
|
|
|
|
cancel := make(chan struct{})
|
|
m.historyArchiveTasks[id] = cancel
|
|
m.historyArchiveTasksWaitGroup.Add(1)
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
m.LogStdout("starting history archive tasks interval", zap.String("id", id))
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
m.LogStdout("starting archive task...", zap.String("id", id))
|
|
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
|
|
if err != nil {
|
|
m.LogStdout("failed to get last archive end date", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// This means there are no waku messages for this community,
|
|
// so nothing to do here
|
|
m.LogStdout("couldn't determine archive start date - skipping")
|
|
continue
|
|
}
|
|
|
|
topics, err := m.GetCommunityChatsTopics(community.ID())
|
|
if err != nil {
|
|
m.LogStdout("failed to get community chat topics ", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
ts := time.Now().Unix()
|
|
to := time.Unix(ts, 0)
|
|
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
|
|
|
|
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
|
|
if err != nil {
|
|
m.LogStdout("failed to create and seed history archive", zap.Error(err))
|
|
continue
|
|
}
|
|
case <-cancel:
|
|
m.UnseedHistoryArchiveTorrent(community.ID())
|
|
delete(m.historyArchiveTasks, id)
|
|
m.historyArchiveTasksWaitGroup.Done()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) StopHistoryArchiveTasksIntervals() {
|
|
for _, t := range m.historyArchiveTasks {
|
|
close(t)
|
|
}
|
|
// Stoping archive interval tasks is async, so we need
|
|
// to wait for all of them to be closed before we shutdown
|
|
// the torrent client
|
|
m.historyArchiveTasksWaitGroup.Wait()
|
|
}
|
|
|
|
func (m *Manager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
|
|
task, ok := m.historyArchiveTasks[communityID.String()]
|
|
if ok {
|
|
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
|
|
close(task)
|
|
}
|
|
}
|
|
|
|
type EncodedArchiveData struct {
|
|
padding int
|
|
bytes []byte
|
|
}
|
|
|
|
func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
|
|
return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
|
|
}
|
|
|
|
func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
|
|
|
|
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
|
|
}
|
|
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
|
|
|
|
loadFromDB := len(msgs) == 0
|
|
|
|
from := startDate
|
|
to := from.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
|
|
archiveDir := m.torrentConfig.DataDir + "/" + communityID.String()
|
|
torrentDir := m.torrentConfig.TorrentDir
|
|
indexPath := archiveDir + "/index"
|
|
dataPath := archiveDir + "/data"
|
|
|
|
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
|
wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata)
|
|
archiveIDs := make([]string, 0)
|
|
|
|
if _, err := os.Stat(archiveDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(archiveDir, 0700)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
if _, err := os.Stat(torrentDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(torrentDir, 0700)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
_, err := os.Stat(indexPath)
|
|
if err == nil {
|
|
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
var offset uint64 = 0
|
|
|
|
for hash, metadata := range wakuMessageArchiveIndexProto.Archives {
|
|
offset = offset + metadata.Size
|
|
wakuMessageArchiveIndex[hash] = metadata
|
|
}
|
|
|
|
var encodedArchives []*EncodedArchiveData
|
|
topicsAsByteArrays := topicsAsByteArrays(topics)
|
|
|
|
m.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{
|
|
CommunityID: communityID.String(),
|
|
}})
|
|
|
|
m.LogStdout("creating archives",
|
|
zap.Any("startDate", startDate),
|
|
zap.Any("endDate", endDate),
|
|
zap.Duration("partition", partition),
|
|
)
|
|
for {
|
|
if from.Equal(endDate) || from.After(endDate) {
|
|
break
|
|
}
|
|
m.LogStdout("creating message archive",
|
|
zap.Any("from", from),
|
|
zap.Any("to", to),
|
|
)
|
|
|
|
var messages []types.Message
|
|
if loadFromDB {
|
|
messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
} else {
|
|
for _, msg := range msgs {
|
|
if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() {
|
|
messages = append(messages, *msg)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if len(messages) == 0 {
|
|
// No need to create an archive with zero messages
|
|
m.LogStdout("no messages in this partition")
|
|
from = to
|
|
to = to.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Not only do we partition messages, we also chunk them
|
|
// roughly by size, such that each chunk will not exceed a given
|
|
// size and archive data doesn't get too big
|
|
messageChunks := make([][]types.Message, 0)
|
|
currentChunkSize := 0
|
|
currentChunk := make([]types.Message, 0)
|
|
|
|
for _, msg := range messages {
|
|
msgSize := len(msg.Payload) + len(msg.Sig)
|
|
if msgSize > maxArchiveSizeInBytes {
|
|
// we drop messages this big
|
|
continue
|
|
}
|
|
|
|
if currentChunkSize+msgSize > maxArchiveSizeInBytes {
|
|
messageChunks = append(messageChunks, currentChunk)
|
|
currentChunk = make([]types.Message, 0)
|
|
currentChunkSize = 0
|
|
}
|
|
currentChunk = append(currentChunk, msg)
|
|
currentChunkSize = currentChunkSize + msgSize
|
|
}
|
|
messageChunks = append(messageChunks, currentChunk)
|
|
|
|
for _, messages := range messageChunks {
|
|
wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays)
|
|
encodedArchive, err := proto.Marshal(wakuMessageArchive)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
if encrypt {
|
|
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
encodedArchive, err = proto.Marshal(messageSpec.Message)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
rawSize := len(encodedArchive)
|
|
padding := 0
|
|
size := 0
|
|
|
|
if rawSize > pieceLength {
|
|
size = rawSize + pieceLength - (rawSize % pieceLength)
|
|
padding = size - rawSize
|
|
} else {
|
|
padding = pieceLength - rawSize
|
|
size = rawSize + padding
|
|
}
|
|
|
|
wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{
|
|
Metadata: wakuMessageArchive.Metadata,
|
|
Offset: offset,
|
|
Size: uint64(size),
|
|
Padding: uint64(padding),
|
|
}
|
|
|
|
wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()
|
|
archiveIDs = append(archiveIDs, archiveID)
|
|
wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata
|
|
encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding})
|
|
offset = offset + uint64(rawSize) + uint64(padding)
|
|
}
|
|
|
|
from = to
|
|
to = to.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
}
|
|
|
|
if len(encodedArchives) > 0 {
|
|
|
|
dataBytes := make([]byte, 0)
|
|
|
|
for _, encodedArchiveData := range encodedArchives {
|
|
dataBytes = append(dataBytes, encodedArchiveData.bytes...)
|
|
dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...)
|
|
}
|
|
|
|
wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex
|
|
indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
if encrypt {
|
|
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
indexBytes, err = proto.Marshal(messageSpec.Message)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
file, err := os.OpenFile(dataPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
defer file.Close()
|
|
|
|
_, err = file.Write(dataBytes)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfo := metainfo.MetaInfo{
|
|
AnnounceList: defaultAnnounceList,
|
|
}
|
|
metaInfo.SetDefaults()
|
|
metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey)
|
|
|
|
info := metainfo.Info{
|
|
PieceLength: int64(pieceLength),
|
|
}
|
|
|
|
err = info.BuildFromFilePath(archiveDir)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfo.InfoBytes, err = bencode.Marshal(info)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfoBytes, err := bencode.Marshal(metaInfo)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) // nolint: gosec
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(startDate.Unix()),
|
|
To: int(endDate.Unix()),
|
|
},
|
|
})
|
|
} else {
|
|
m.LogStdout("no archives created")
|
|
m.publish(&Subscription{
|
|
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(startDate.Unix()),
|
|
To: int(endDate.Unix()),
|
|
},
|
|
})
|
|
}
|
|
|
|
lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
if lastMessageArchiveEndDate > 0 {
|
|
err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
|
|
} else {
|
|
err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
|
|
}
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
return archiveIDs, nil
|
|
}
|
|
|
|
func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error {
|
|
m.UnseedHistoryArchiveTorrent(communityID)
|
|
|
|
id := communityID.String()
|
|
torrentFile := m.torrentFile(id)
|
|
|
|
metaInfo, err := metainfo.LoadFromFile(torrentFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
info, err := metaInfo.UnmarshalInfo()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hash := metaInfo.HashInfoBytes()
|
|
m.torrentTasks[id] = hash
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
torrent, err := m.torrentClient.AddTorrent(metaInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
torrent.DownloadAll()
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
|
|
CommunityID: communityID.String(),
|
|
},
|
|
})
|
|
|
|
magnetLink := metaInfo.Magnet(nil, &info).String()
|
|
|
|
m.LogStdout("seeding torrent", zap.String("id", id), zap.String("magnetLink", magnetLink))
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) UnseedHistoryArchiveTorrent(communityID types.HexBytes) {
|
|
id := communityID.String()
|
|
|
|
hash, exists := m.torrentTasks[id]
|
|
|
|
if exists {
|
|
torrent, ok := m.torrentClient.Torrent(hash)
|
|
if ok {
|
|
m.logger.Debug("Unseeding and dropping torrent for community: ", zap.Any("id", id))
|
|
torrent.Drop()
|
|
delete(m.torrentTasks, id)
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesUnseededSignal: &signal.HistoryArchivesUnseededSignal{
|
|
CommunityID: id,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) bool {
|
|
id := communityID.String()
|
|
hash := m.torrentTasks[id]
|
|
torrent, ok := m.torrentClient.Torrent(hash)
|
|
return ok && torrent.Seeding()
|
|
}
|
|
|
|
func (m *Manager) GetHistoryArchiveDownloadTask(communityID string) *HistoryArchiveDownloadTask {
|
|
return m.historyArchiveDownloadTasks[communityID]
|
|
}
|
|
|
|
func (m *Manager) DeleteHistoryArchiveDownloadTask(communityID string) {
|
|
delete(m.historyArchiveDownloadTasks, communityID)
|
|
}
|
|
|
|
func (m *Manager) AddHistoryArchiveDownloadTask(communityID string, task *HistoryArchiveDownloadTask) {
|
|
m.historyArchiveDownloadTasks[communityID] = task
|
|
}
|
|
|
|
type HistoryArchiveDownloadTaskInfo struct {
|
|
TotalDownloadedArchivesCount int
|
|
TotalArchivesCount int
|
|
Cancelled bool
|
|
}
|
|
|
|
func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string, cancelTask chan struct{}) (*HistoryArchiveDownloadTaskInfo, error) {
|
|
|
|
id := communityID.String()
|
|
|
|
ml, err := metainfo.ParseMagnetUri(magnetlink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.logger.Debug("adding torrent via magnetlink for community", zap.String("id", id), zap.String("magnetlink", magnetlink))
|
|
torrent, err := m.torrentClient.AddMagnet(magnetlink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
downloadTaskInfo := &HistoryArchiveDownloadTaskInfo{
|
|
TotalDownloadedArchivesCount: 0,
|
|
TotalArchivesCount: 0,
|
|
Cancelled: false,
|
|
}
|
|
|
|
m.torrentTasks[id] = ml.InfoHash
|
|
timeout := time.After(20 * time.Second)
|
|
|
|
m.LogStdout("fetching torrent info", zap.String("magnetlink", magnetlink))
|
|
select {
|
|
case <-timeout:
|
|
return nil, ErrTorrentTimedout
|
|
case <-cancelTask:
|
|
m.LogStdout("cancelled fetching torrent info")
|
|
downloadTaskInfo.Cancelled = true
|
|
return downloadTaskInfo, nil
|
|
case <-torrent.GotInfo():
|
|
|
|
files := torrent.Files()
|
|
|
|
i, ok := findIndexFile(files)
|
|
if !ok {
|
|
// We're dealing with a malformed torrent, so don't do anything
|
|
return nil, errors.New("malformed torrent data")
|
|
}
|
|
|
|
indexFile := files[i]
|
|
indexFile.Download()
|
|
|
|
m.LogStdout("downloading history archive index")
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cancelTask:
|
|
m.LogStdout("cancelled downloading archive index")
|
|
downloadTaskInfo.Cancelled = true
|
|
return downloadTaskInfo, nil
|
|
case <-ticker.C:
|
|
if indexFile.BytesCompleted() == indexFile.Length() {
|
|
|
|
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
existingArchiveIDs, err := m.persistence.GetDownloadedMessageArchiveIDs(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(existingArchiveIDs) == len(index.Archives) {
|
|
m.LogStdout("download cancelled, no new archives")
|
|
return downloadTaskInfo, nil
|
|
}
|
|
|
|
downloadTaskInfo.TotalDownloadedArchivesCount = len(existingArchiveIDs)
|
|
downloadTaskInfo.TotalArchivesCount = len(index.Archives)
|
|
|
|
archiveHashes := make(archiveMDSlice, 0, downloadTaskInfo.TotalArchivesCount)
|
|
|
|
for hash, metadata := range index.Archives {
|
|
archiveHashes = append(archiveHashes, &archiveMetadata{hash: hash, from: metadata.Metadata.From})
|
|
}
|
|
|
|
sort.Sort(sort.Reverse(archiveHashes))
|
|
|
|
m.publish(&Subscription{
|
|
DownloadingHistoryArchivesStartedSignal: &signal.DownloadingHistoryArchivesStartedSignal{
|
|
CommunityID: communityID.String(),
|
|
},
|
|
})
|
|
|
|
for _, hd := range archiveHashes {
|
|
|
|
hash := hd.hash
|
|
hasArchive := false
|
|
|
|
for _, existingHash := range existingArchiveIDs {
|
|
if existingHash == hash {
|
|
hasArchive = true
|
|
break
|
|
}
|
|
}
|
|
if hasArchive {
|
|
continue
|
|
}
|
|
|
|
metadata := index.Archives[hash]
|
|
startIndex := int(metadata.Offset) / pieceLength
|
|
endIndex := startIndex + int(metadata.Size)/pieceLength
|
|
|
|
downloadMsg := fmt.Sprintf("downloading data for message archive (%d/%d)", downloadTaskInfo.TotalDownloadedArchivesCount+1, downloadTaskInfo.TotalArchivesCount)
|
|
m.LogStdout(downloadMsg, zap.String("hash", hash))
|
|
m.LogStdout("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
|
|
torrent.DownloadPieces(startIndex, endIndex)
|
|
|
|
piecesCompleted := make(map[int]bool)
|
|
for i = startIndex; i < endIndex; i++ {
|
|
piecesCompleted[i] = false
|
|
}
|
|
|
|
psc := torrent.SubscribePieceStateChanges()
|
|
downloadTicker := time.NewTicker(1 * time.Second)
|
|
defer downloadTicker.Stop()
|
|
|
|
downloadLoop:
|
|
for {
|
|
select {
|
|
case <-downloadTicker.C:
|
|
done := true
|
|
for i = startIndex; i < endIndex; i++ {
|
|
piecesCompleted[i] = torrent.PieceState(i).Complete
|
|
if !piecesCompleted[i] {
|
|
done = false
|
|
}
|
|
}
|
|
if done {
|
|
psc.Close()
|
|
break downloadLoop
|
|
}
|
|
case <-cancelTask:
|
|
m.LogStdout("downloading archive data interrupted")
|
|
downloadTaskInfo.Cancelled = true
|
|
return downloadTaskInfo, nil
|
|
}
|
|
}
|
|
downloadTaskInfo.TotalDownloadedArchivesCount++
|
|
err = m.persistence.SaveMessageArchiveID(communityID, hash)
|
|
if err != nil {
|
|
m.LogStdout("couldn't save message archive ID", zap.Error(err))
|
|
continue
|
|
}
|
|
m.publish(&Subscription{
|
|
HistoryArchiveDownloadedSignal: &signal.HistoryArchiveDownloadedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(metadata.Metadata.From),
|
|
To: int(metadata.Metadata.To),
|
|
},
|
|
})
|
|
}
|
|
m.publish(&Subscription{
|
|
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
|
|
CommunityID: communityID.String(),
|
|
},
|
|
})
|
|
m.LogStdout("finished downloading archives")
|
|
return downloadTaskInfo, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
|
|
return m.persistence.GetMessageArchiveIDsToImport(communityID)
|
|
}
|
|
|
|
func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) {
|
|
id := communityID.String()
|
|
|
|
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dataFile, err := os.Open(m.archiveDataFile(id))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer dataFile.Close()
|
|
|
|
m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID))
|
|
metadata := index.Archives[archiveID]
|
|
|
|
_, err = dataFile.Seek(int64(metadata.Offset), 0)
|
|
if err != nil {
|
|
m.LogStdout("failed to seek archive data file", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
data := make([]byte, metadata.Size-metadata.Padding)
|
|
_, err = dataFile.Read(data)
|
|
if err != nil {
|
|
m.LogStdout("failed failed to read archive data", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
|
|
err = proto.Unmarshal(data, archive)
|
|
if err != nil {
|
|
// The archive data might eb encrypted so we try to decrypt instead first
|
|
var protocolMessage encryption.ProtocolMessage
|
|
err := proto.Unmarshal(data, &protocolMessage)
|
|
if err != nil {
|
|
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
pk, err := crypto.DecompressPubkey(communityID)
|
|
if err != nil {
|
|
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
|
|
if err != nil {
|
|
m.LogStdout("failed to decrypt message archive", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
|
|
if err != nil {
|
|
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
}
|
|
return archive.Messages, nil
|
|
}
|
|
|
|
func (m *Manager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {
|
|
return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported)
|
|
}
|
|
|
|
func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) {
|
|
id := communityID.String()
|
|
torrentFile := m.torrentFile(id)
|
|
|
|
metaInfo, err := metainfo.LoadFromFile(torrentFile)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
info, err := metaInfo.UnmarshalInfo()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return metaInfo.Magnet(nil, &info).String(), nil
|
|
}
|
|
|
|
func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
|
|
var wakuMessages []*protobuf.WakuMessage
|
|
|
|
for _, msg := range messages {
|
|
topic := types.TopicTypeToByteArray(msg.Topic)
|
|
wakuMessage := &protobuf.WakuMessage{
|
|
Sig: msg.Sig,
|
|
Timestamp: uint64(msg.Timestamp),
|
|
Topic: topic,
|
|
Payload: msg.Payload,
|
|
Padding: msg.Padding,
|
|
Hash: msg.Hash,
|
|
ThirdPartyId: msg.ThirdPartyID,
|
|
}
|
|
wakuMessages = append(wakuMessages, wakuMessage)
|
|
}
|
|
|
|
metadata := protobuf.WakuMessageArchiveMetadata{
|
|
From: uint64(from.Unix()),
|
|
To: uint64(to.Unix()),
|
|
ContentTopic: topics,
|
|
}
|
|
|
|
wakuMessageArchive := &protobuf.WakuMessageArchive{
|
|
Metadata: &metadata,
|
|
Messages: wakuMessages,
|
|
}
|
|
return wakuMessageArchive
|
|
}
|
|
|
|
func (m *Manager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
|
|
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
|
|
|
indexPath := m.archiveIndexFile(communityID.String())
|
|
indexData, err := os.ReadFile(indexPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 {
|
|
// This means we're dealing with an encrypted index file, so we have to decrypt it first
|
|
var protocolMessage encryption.ProtocolMessage
|
|
err := proto.Unmarshal(indexData, &protocolMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pk, err := crypto.DecompressPubkey(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return wakuMessageArchiveIndexProto, nil
|
|
}
|
|
|
|
func (m *Manager) TorrentFileExists(communityID string) bool {
|
|
_, err := os.Stat(m.torrentFile(communityID))
|
|
return err == nil
|
|
}
|
|
|
|
func (m *Manager) torrentFile(communityID string) string {
|
|
return m.torrentConfig.TorrentDir + "/" + communityID + ".torrent"
|
|
}
|
|
|
|
func (m *Manager) archiveIndexFile(communityID string) string {
|
|
return m.torrentConfig.DataDir + "/" + communityID + "/index"
|
|
}
|
|
|
|
func (m *Manager) archiveDataFile(communityID string) string {
|
|
return m.torrentConfig.DataDir + "/" + communityID + "/data"
|
|
}
|
|
|
|
func topicsAsByteArrays(topics []types.TopicType) [][]byte {
|
|
var topicsAsByteArrays [][]byte
|
|
for _, t := range topics {
|
|
topic := types.TopicTypeToByteArray(t)
|
|
topicsAsByteArrays = append(topicsAsByteArrays, topic)
|
|
}
|
|
return topicsAsByteArrays
|
|
}
|
|
|
|
func findIndexFile(files []*torrent.File) (index int, ok bool) {
|
|
for i, f := range files {
|
|
if f.DisplayPath() == "index" {
|
|
return i, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|