Pascal Precht eb2b4b7cdc refactor(Communities): ensure generated archives don't exceed max cap
This introduces an addition constraint to archive generation, in which the payload + signature size of all partitioned message that go into an archive should not exceed a certain
threshold.

This is to ensure that archives won't get too big when they are later read into memory.
2023-01-20 17:09:00 +01:00

2534 lines
69 KiB
Go

package communities
import (
"crypto/ecdsa"
"database/sql"
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/ens"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/signal"
)
var defaultAnnounceList = [][]string{
{"udp://tracker.opentrackr.org:1337/announce"},
{"udp://tracker.openbittorrent.com:6969/announce"},
}
var pieceLength = 100 * 1024
const maxArchiveSizeInBytes = 30000000
var ErrTorrentTimedout = errors.New("torrent has timed out")
type Manager struct {
persistence *Persistence
encryptor *encryption.Protocol
ensSubscription chan []*ens.VerificationRecord
subscriptions []chan *Subscription
ensVerifier *ens.Verifier
identity *ecdsa.PrivateKey
logger *zap.Logger
stdoutLogger *zap.Logger
transport *transport.Transport
quit chan struct{}
torrentConfig *params.TorrentConfig
torrentClient *torrent.Client
historyArchiveTasksWaitGroup sync.WaitGroup
historyArchiveTasks map[string]chan struct{}
torrentTasks map[string]metainfo.Hash
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
}
type HistoryArchiveDownloadTask struct {
Cancel chan struct{}
Waiter sync.WaitGroup
}
func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
if identity == nil {
return nil, errors.New("empty identity")
}
var err error
if logger == nil {
if logger, err = zap.NewDevelopment(); err != nil {
return nil, errors.Wrap(err, "failed to create a logger")
}
}
stdoutLogger, err := zap.NewDevelopment()
if err != nil {
return nil, errors.Wrap(err, "failed to create archive logger")
}
manager := &Manager{
logger: logger,
stdoutLogger: stdoutLogger,
encryptor: encryptor,
identity: identity,
quit: make(chan struct{}),
transport: transport,
torrentConfig: torrentConfig,
historyArchiveTasks: make(map[string]chan struct{}),
torrentTasks: make(map[string]metainfo.Hash),
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
persistence: &Persistence{
logger: logger,
db: db,
},
}
if verifier != nil {
sub := verifier.Subscribe()
manager.ensSubscription = sub
manager.ensVerifier = verifier
}
return manager, nil
}
func (m *Manager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}
type archiveMDSlice []*archiveMetadata
type archiveMetadata struct {
hash string
from uint64
}
func (md archiveMDSlice) Len() int {
return len(md)
}
func (md archiveMDSlice) Swap(i, j int) {
md[i], md[j] = md[j], md[i]
}
func (md archiveMDSlice) Less(i, j int) bool {
return md[i].from > md[j].from
}
type Subscription struct {
Community *Community
Invitations []*protobuf.CommunityInvitation
CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal
HistoryArchivesCreatedSignal *signal.HistoryArchivesCreatedSignal
NoHistoryArchivesCreatedSignal *signal.NoHistoryArchivesCreatedSignal
HistoryArchivesSeedingSignal *signal.HistoryArchivesSeedingSignal
HistoryArchivesUnseededSignal *signal.HistoryArchivesUnseededSignal
HistoryArchiveDownloadedSignal *signal.HistoryArchiveDownloadedSignal
DownloadingHistoryArchivesStartedSignal *signal.DownloadingHistoryArchivesStartedSignal
DownloadingHistoryArchivesFinishedSignal *signal.DownloadingHistoryArchivesFinishedSignal
ImportingHistoryArchiveMessagesSignal *signal.ImportingHistoryArchiveMessagesSignal
}
type CommunityResponse struct {
Community *Community `json:"community"`
Changes *CommunityChanges `json:"changes"`
}
func (m *Manager) Subscribe() chan *Subscription {
subscription := make(chan *Subscription, 100)
m.subscriptions = append(m.subscriptions, subscription)
return subscription
}
func (m *Manager) Start() error {
if m.ensVerifier != nil {
m.runENSVerificationLoop()
}
if m.torrentConfig != nil && m.torrentConfig.Enabled {
err := m.StartTorrentClient()
if err != nil {
m.LogStdout("couldn't start torrent client", zap.Error(err))
}
}
return nil
}
func (m *Manager) runENSVerificationLoop() {
go func() {
for {
select {
case <-m.quit:
m.logger.Debug("quitting ens verification loop")
return
case records, more := <-m.ensSubscription:
if !more {
m.logger.Debug("no more ens records, quitting")
return
}
m.logger.Info("received records", zap.Any("records", records))
}
}
}()
}
func (m *Manager) Stop() error {
close(m.quit)
for _, c := range m.subscriptions {
close(c)
}
m.StopTorrentClient()
return nil
}
func (m *Manager) SetTorrentConfig(config *params.TorrentConfig) {
m.torrentConfig = config
}
func (m *Manager) StartTorrentClient() error {
if m.torrentConfig == nil {
return fmt.Errorf("Can't start torrent client: missing torrentConfig")
}
if m.TorrentClientStarted() {
return nil
}
config := torrent.NewDefaultClientConfig()
config.SetListenAddr(":" + fmt.Sprint(m.torrentConfig.Port))
config.Seed = true
config.DataDir = m.torrentConfig.DataDir
if _, err := os.Stat(m.torrentConfig.DataDir); os.IsNotExist(err) {
err := os.MkdirAll(m.torrentConfig.DataDir, 0700)
if err != nil {
return err
}
}
m.logger.Info("Starting torrent client", zap.Any("port", m.torrentConfig.Port))
// Instantiating the client will make it bootstrap and listen eagerly,
// so no go routine is needed here
client, err := torrent.NewClient(config)
if err != nil {
return err
}
m.torrentClient = client
return nil
}
func (m *Manager) StopTorrentClient() []error {
if m.TorrentClientStarted() {
m.StopHistoryArchiveTasksIntervals()
m.logger.Info("Stopping torrent client")
errs := m.torrentClient.Close()
if len(errs) > 0 {
return errs
}
m.torrentClient = nil
}
return make([]error, 0)
}
func (m *Manager) TorrentClientStarted() bool {
return m.torrentClient != nil
}
func (m *Manager) publish(subscription *Subscription) {
for _, s := range m.subscriptions {
select {
case s <- subscription:
default:
m.logger.Warn("subscription channel full, dropping message")
}
}
}
func (m *Manager) All() ([]*Community, error) {
return m.persistence.AllCommunities(&m.identity.PublicKey)
}
type KnownCommunitiesResponse struct {
ContractCommunities []string `json:"contractCommunities"`
Descriptions map[string]*Community `json:"communities"`
UnknownCommunities []string `json:"unknownCommunities"`
}
func (m *Manager) GetStoredDescriptionForCommunities(communityIDs []types.HexBytes) (response *KnownCommunitiesResponse, err error) {
response = &KnownCommunitiesResponse{
Descriptions: make(map[string]*Community),
}
for i := range communityIDs {
communityID := communityIDs[i].String()
var community *Community
community, err = m.GetByID(communityIDs[i])
if err != nil {
return
}
response.ContractCommunities = append(response.ContractCommunities, communityID)
if community != nil {
response.Descriptions[community.IDString()] = community
} else {
response.UnknownCommunities = append(response.UnknownCommunities, communityID)
}
}
return
}
func (m *Manager) Joined() ([]*Community, error) {
return m.persistence.JoinedCommunities(&m.identity.PublicKey)
}
func (m *Manager) Spectated() ([]*Community, error) {
return m.persistence.SpectatedCommunities(&m.identity.PublicKey)
}
func (m *Manager) JoinedAndPendingCommunitiesWithRequests() ([]*Community, error) {
return m.persistence.JoinedAndPendingCommunitiesWithRequests(&m.identity.PublicKey)
}
func (m *Manager) DeletedCommunities() ([]*Community, error) {
return m.persistence.DeletedCommunities(&m.identity.PublicKey)
}
func (m *Manager) Created() ([]*Community, error) {
return m.persistence.CreatedCommunities(&m.identity.PublicKey)
}
// CreateCommunity takes a description, generates an ID for it, saves it and return it
func (m *Manager) CreateCommunity(request *requests.CreateCommunity, publish bool) (*Community, error) {
description, err := request.ToCommunityDescription()
if err != nil {
return nil, err
}
description.Members = make(map[string]*protobuf.CommunityMember)
description.Members[common.PubkeyToHex(&m.identity.PublicKey)] = &protobuf.CommunityMember{Roles: []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL}}
err = ValidateCommunityDescription(description)
if err != nil {
return nil, err
}
description.Clock = 1
key, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
config := Config{
ID: &key.PublicKey,
PrivateKey: key,
Logger: m.logger,
Joined: true,
MemberIdentity: &m.identity.PublicKey,
CommunityDescription: description,
}
community, err := New(config)
if err != nil {
return nil, err
}
// We join any community we create
community.Join()
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
if publish {
m.publish(&Subscription{Community: community})
}
return community, nil
}
func (m *Manager) DeleteCommunity(id types.HexBytes) error {
err := m.persistence.DeleteCommunity(id)
if err != nil {
return err
}
return m.persistence.DeleteCommunitySettings(id)
}
// EditCommunity takes a description, updates the community with the description,
// saves it and returns it
func (m *Manager) EditCommunity(request *requests.EditCommunity) (*Community, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
if !community.IsAdmin() {
return nil, errors.New("not an admin")
}
newDescription, err := request.ToCommunityDescription()
if err != nil {
return nil, fmt.Errorf("Can't create community description: %v", err)
}
// If permissions weren't explicitly set on original request, use existing ones
if newDescription.Permissions.Access == protobuf.CommunityPermissions_UNKNOWN_ACCESS {
newDescription.Permissions.Access = community.config.CommunityDescription.Permissions.Access
}
// Use existing images for the entries that were not updated
// NOTE: This will NOT allow deletion of the community image; it will need to
// be handled separately.
for imageName := range community.config.CommunityDescription.Identity.Images {
_, exists := newDescription.Identity.Images[imageName]
if !exists {
// If no image was set in ToCommunityDescription then Images is nil.
if newDescription.Identity.Images == nil {
newDescription.Identity.Images = make(map[string]*protobuf.IdentityImage)
}
newDescription.Identity.Images[imageName] = community.config.CommunityDescription.Identity.Images[imageName]
}
}
// TODO: handle delete image (if needed)
err = ValidateCommunityDescription(newDescription)
if err != nil {
return nil, err
}
// Edit the community values
community.Edit(newDescription)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error) {
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if !community.IsAdmin() {
return nil, errors.New("not an admin")
}
return community.config.PrivateKey, nil
}
func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
communityID := crypto.CompressPubkey(&key.PublicKey)
community, err := m.persistence.GetByID(&m.identity.PublicKey, communityID)
if err != nil {
return nil, err
}
if community == nil {
description := &protobuf.CommunityDescription{
Permissions: &protobuf.CommunityPermissions{},
}
config := Config{
ID: &key.PublicKey,
PrivateKey: key,
Logger: m.logger,
Joined: true,
MemberIdentity: &m.identity.PublicKey,
CommunityDescription: description,
}
community, err = New(config)
if err != nil {
return nil, err
}
} else {
community.config.PrivateKey = key
}
community.Join()
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
return community, nil
}
func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat, publish bool) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
chatID := uuid.New().String()
changes, err := community.CreateChat(chatID, chat)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
if publish {
m.publish(&Subscription{Community: community})
}
return community, changes, nil
}
func (m *Manager) EditChat(communityID types.HexBytes, chatID string, chat *protobuf.CommunityChat) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
// Remove communityID prefix from chatID if exists
if strings.HasPrefix(chatID, communityID.String()) {
chatID = strings.TrimPrefix(chatID, communityID.String())
}
changes, err := community.EditChat(chatID, chat)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) DeleteChat(communityID types.HexBytes, chatID string) (*Community, *protobuf.CommunityDescription, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
// Remove communityID prefix from chatID if exists
if strings.HasPrefix(chatID, communityID.String()) {
chatID = strings.TrimPrefix(chatID, communityID.String())
}
description, err := community.DeleteChat(chatID)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, description, nil
}
func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory, publish bool) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
categoryID := uuid.New().String()
// Remove communityID prefix from chatID if exists
for i, cid := range request.ChatIDs {
if strings.HasPrefix(cid, request.CommunityID.String()) {
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
}
}
changes, err := community.CreateCategory(categoryID, request.CategoryName, request.ChatIDs)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
if publish {
m.publish(&Subscription{Community: community})
}
return community, changes, nil
}
func (m *Manager) EditCategory(request *requests.EditCommunityCategory) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
// Remove communityID prefix from chatID if exists
for i, cid := range request.ChatIDs {
if strings.HasPrefix(cid, request.CommunityID.String()) {
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
}
}
changes, err := community.EditCategory(request.CategoryID, request.CategoryName, request.ChatIDs)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) EditChatFirstMessageTimestamp(communityID types.HexBytes, chatID string, timestamp uint32) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
// Remove communityID prefix from chatID if exists
if strings.HasPrefix(chatID, communityID.String()) {
chatID = strings.TrimPrefix(chatID, communityID.String())
}
changes, err := community.UpdateChatFirstMessageTimestamp(chatID, timestamp)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) ReorderCategories(request *requests.ReorderCommunityCategories) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
changes, err := community.ReorderCategories(request.CategoryID, request.Position)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) ReorderChat(request *requests.ReorderCommunityChat) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
// Remove communityID prefix from chatID if exists
if strings.HasPrefix(request.ChatID, request.CommunityID.String()) {
request.ChatID = strings.TrimPrefix(request.ChatID, request.CommunityID.String())
}
changes, err := community.ReorderChat(request.CategoryID, request.ChatID, request.Position)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) DeleteCategory(request *requests.DeleteCommunityCategory) (*Community, *CommunityChanges, error) {
community, err := m.GetByID(request.CommunityID)
if err != nil {
return nil, nil, err
}
if community == nil {
return nil, nil, ErrOrgNotFound
}
changes, err := community.DeleteCategory(request.CategoryID)
if err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
// Advertise changes
m.publish(&Subscription{Community: community})
return community, changes, nil
}
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte) (*CommunityResponse, error) {
id := crypto.CompressPubkey(signer)
community, err := m.persistence.GetByID(&m.identity.PublicKey, id)
if err != nil {
return nil, err
}
if community == nil {
config := Config{
CommunityDescription: description,
Logger: m.logger,
MarshaledCommunityDescription: payload,
MemberIdentity: &m.identity.PublicKey,
ID: signer,
}
community, err = New(config)
if err != nil {
return nil, err
}
}
changes, err := community.UpdateCommunityDescription(signer, description, payload)
if err != nil {
return nil, err
}
hasCommunityArchiveInfo, err := m.persistence.HasCommunityArchiveInfo(community.ID())
if err != nil {
return nil, err
}
cdMagnetlinkClock := community.config.CommunityDescription.ArchiveMagnetlinkClock
if !hasCommunityArchiveInfo {
err = m.persistence.SaveCommunityArchiveInfo(community.ID(), cdMagnetlinkClock, 0)
if err != nil {
return nil, err
}
} else {
magnetlinkClock, err := m.persistence.GetMagnetlinkMessageClock(community.ID())
if err != nil {
return nil, err
}
if cdMagnetlinkClock > magnetlinkClock {
err = m.persistence.UpdateMagnetlinkMessageClock(community.ID(), cdMagnetlinkClock)
if err != nil {
return nil, err
}
}
}
pkString := common.PubkeyToHex(&m.identity.PublicKey)
// If the community require membership, we set whether we should leave/join the community after a state change
if community.InvitationOnly() || community.OnRequest() || community.AcceptRequestToJoinAutomatically() {
if changes.HasNewMember(pkString) {
hasPendingRequest, err := m.persistence.HasPendingRequestsToJoinForUserAndCommunity(pkString, changes.Community.ID())
if err != nil {
return nil, err
}
// If there's any pending request, we should join the community
// automatically
changes.ShouldMemberJoin = hasPendingRequest
}
if changes.HasMemberLeft(pkString) {
// If we joined previously the community, we should leave it
changes.ShouldMemberLeave = community.Joined()
}
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
// We mark our requests as completed, though maybe we should mark
// any request for any user that has been added as completed
if err := m.markRequestToJoin(&m.identity.PublicKey, community); err != nil {
return nil, err
}
// Check if there's a change and we should be joining
return &CommunityResponse{
Community: community,
Changes: changes,
}, nil
}
// TODO: This is not fully implemented, we want to save the grant passed at
// this stage and make sure it's used when publishing.
func (m *Manager) HandleCommunityInvitation(signer *ecdsa.PublicKey, invitation *protobuf.CommunityInvitation, payload []byte) (*CommunityResponse, error) {
m.logger.Debug("Handling wrapped community description message")
community, err := m.HandleWrappedCommunityDescriptionMessage(payload)
if err != nil {
return nil, err
}
// Save grant
return community, nil
}
// markRequestToJoin marks all the pending requests to join as completed
// if we are members
func (m *Manager) markRequestToJoin(pk *ecdsa.PublicKey, community *Community) error {
if community.HasMember(pk) {
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(pk), community.ID(), RequestToJoinStateAccepted)
}
return nil
}
func (m *Manager) markRequestToJoinAsCanceled(pk *ecdsa.PublicKey, community *Community) error {
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(pk), community.ID(), RequestToJoinStateCanceled)
}
func (m *Manager) SetMuted(id types.HexBytes, muted bool) error {
return m.persistence.SetMuted(id, muted)
}
func (m *Manager) CancelRequestToJoin(request *requests.CancelRequestToJoinCommunity) (*RequestToJoin, *Community, error) {
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
if err != nil {
return nil, nil, err
}
community, err := m.GetByID(dbRequest.CommunityID)
if err != nil {
return nil, nil, err
}
pk, err := common.HexToPubkey(dbRequest.PublicKey)
if err != nil {
return nil, nil, err
}
if err := m.markRequestToJoinAsCanceled(pk, community); err != nil {
return nil, nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, nil, err
}
return dbRequest, community, nil
}
func (m *Manager) AcceptRequestToJoin(request *requests.AcceptRequestToJoinCommunity) (*Community, error) {
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
if err != nil {
return nil, err
}
community, err := m.GetByID(dbRequest.CommunityID)
if err != nil {
return nil, err
}
pk, err := common.HexToPubkey(dbRequest.PublicKey)
if err != nil {
return nil, err
}
err = community.AddMember(pk, []protobuf.CommunityMember_Roles{})
if err != nil {
return nil, err
}
if err := m.markRequestToJoin(pk, community); err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) GetRequestToJoin(ID types.HexBytes) (*RequestToJoin, error) {
return m.persistence.GetRequestToJoin(ID)
}
func (m *Manager) DeclineRequestToJoin(request *requests.DeclineRequestToJoinCommunity) error {
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
if err != nil {
return err
}
return m.persistence.SetRequestToJoinState(dbRequest.PublicKey, dbRequest.CommunityID, RequestToJoinStateDeclined)
}
func (m *Manager) HandleCommunityCancelRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityCancelRequestToJoin) (*RequestToJoin, error) {
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
err = m.markRequestToJoinAsCanceled(signer, community)
if err != nil {
return nil, err
}
requestToJoin, err := m.persistence.GetRequestToJoinByPk(common.PubkeyToHex(signer), community.ID(), RequestToJoinStateCanceled)
if err != nil {
return nil, err
}
return requestToJoin, nil
}
func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoin) (*RequestToJoin, error) {
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
if err := community.ValidateRequestToJoin(signer, request); err != nil {
return nil, err
}
requestToJoin := &RequestToJoin{
PublicKey: common.PubkeyToHex(signer),
Clock: request.Clock,
ENSName: request.EnsName,
CommunityID: request.CommunityId,
State: RequestToJoinStatePending,
}
requestToJoin.CalculateID()
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
return nil, err
}
// If user is already a member, then accept request automatically
// It may happen when member removes itself from community and then tries to rejoin
// More specifically, CommunityRequestToLeave may be delivered later than CommunityRequestToJoin, or not delivered at all
acceptAutomatically := community.AcceptRequestToJoinAutomatically() || community.HasMember(signer)
if acceptAutomatically {
err = m.markRequestToJoin(signer, community)
if err != nil {
return nil, err
}
requestToJoin.State = RequestToJoinStateAccepted
}
return requestToJoin, nil
}
func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoinResponse) error {
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
if err != nil {
return err
}
if community == nil {
return ErrOrgNotFound
}
communityDescriptionBytes, err := proto.Marshal(request.Community)
if err != nil {
return err
}
// We need to wrap `request.Community` in an `ApplicationMetadataMessage`
// of type `CommunityDescription` because `UpdateCommunityDescription` expects this.
//
// This is merely for marsheling/unmarsheling, hence we attaching a `Signature`
// is not needed.
metadataMessage := &protobuf.ApplicationMetadataMessage{
Payload: communityDescriptionBytes,
Type: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
}
appMetadataMsg, err := proto.Marshal(metadataMessage)
if err != nil {
return err
}
_, err = community.UpdateCommunityDescription(signer, request.Community, appMetadataMsg)
if err != nil {
return err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return err
}
if request.Accepted {
return m.markRequestToJoin(&m.identity.PublicKey, community)
}
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(&m.identity.PublicKey), community.ID(), RequestToJoinStateDeclined)
}
func (m *Manager) HandleCommunityRequestToLeave(signer *ecdsa.PublicKey, proto *protobuf.CommunityRequestToLeave) error {
requestToLeave := NewRequestToLeave(common.PubkeyToHex(signer), proto)
if err := m.persistence.SaveRequestToLeave(requestToLeave); err != nil {
return err
}
// Ensure corresponding requestToJoin clock is older than requestToLeave
requestToJoin, err := m.persistence.GetRequestToJoin(requestToLeave.ID)
if err != nil {
return err
}
if requestToJoin.Clock > requestToLeave.Clock {
return ErrOldRequestToLeave
}
return nil
}
func (m *Manager) HandleWrappedCommunityDescriptionMessage(payload []byte) (*CommunityResponse, error) {
m.logger.Debug("Handling wrapped community description message")
applicationMetadataMessage := &protobuf.ApplicationMetadataMessage{}
err := proto.Unmarshal(payload, applicationMetadataMessage)
if err != nil {
return nil, err
}
if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
return nil, ErrInvalidMessage
}
signer, err := applicationMetadataMessage.RecoverKey()
if err != nil {
return nil, err
}
description := &protobuf.CommunityDescription{}
err = proto.Unmarshal(applicationMetadataMessage.Payload, description)
if err != nil {
return nil, err
}
return m.HandleCommunityDescriptionMessage(signer, description, payload)
}
func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) {
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
community.Join()
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
return community, nil
}
func (m *Manager) SpectateCommunity(id types.HexBytes) (*Community, error) {
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
community.Spectate()
if err = m.persistence.SaveCommunity(community); err != nil {
return nil, err
}
return community, nil
}
func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
return m.persistence.GetMagnetlinkMessageClock(communityID)
}
func (m *Manager) GetRequestToJoinIDByPkAndCommunityID(pk *ecdsa.PublicKey, communityID []byte) ([]byte, error) {
return m.persistence.GetRequestToJoinIDByPkAndCommunityID(common.PubkeyToHex(pk), communityID)
}
func (m *Manager) UpdateCommunityDescriptionMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
community, err := m.GetByIDString(communityID.String())
if err != nil {
return err
}
community.config.CommunityDescription.ArchiveMagnetlinkClock = clock
return m.persistence.SaveCommunity(community)
}
func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock)
}
func (m *Manager) UpdateLastSeenMagnetlink(communityID types.HexBytes, magnetlinkURI string) error {
return m.persistence.UpdateLastSeenMagnetlink(communityID, magnetlinkURI)
}
func (m *Manager) GetLastSeenMagnetlink(communityID types.HexBytes) (string, error) {
return m.persistence.GetLastSeenMagnetlink(communityID)
}
func (m *Manager) LeaveCommunity(id types.HexBytes) (*Community, error) {
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
community.RemoveOurselvesFromOrg(&m.identity.PublicKey)
community.Leave()
if err = m.persistence.SaveCommunity(community); err != nil {
return nil, err
}
return community, nil
}
func (m *Manager) inviteUsersToCommunity(community *Community, pks []*ecdsa.PublicKey) (*Community, error) {
var invitations []*protobuf.CommunityInvitation
for _, pk := range pks {
invitation, err := community.InviteUserToOrg(pk)
if err != nil {
return nil, err
}
// We mark the user request (if any) as completed
if err := m.markRequestToJoin(pk, community); err != nil {
return nil, err
}
invitations = append(invitations, invitation)
}
err := m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community, Invitations: invitations})
return community, nil
}
func (m *Manager) InviteUsersToCommunity(communityID types.HexBytes, pks []*ecdsa.PublicKey) (*Community, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
return m.inviteUsersToCommunity(community, pks)
}
func (m *Manager) AddMemberOwnerToCommunity(communityID types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
community, err := m.GetByID(communityID)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
err = community.AddMember(pk, []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL})
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) RemoveUserFromCommunity(id types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
_, err = community.RemoveUserFromOrg(pk)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) UnbanUserFromCommunity(request *requests.UnbanUserFromCommunity) (*Community, error) {
id := request.CommunityID
publicKey, err := common.HexToPubkey(request.User.String())
if err != nil {
return nil, err
}
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
_, err = community.UnbanUserFromCommunity(publicKey)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) AddRoleToMember(request *requests.AddRoleToMember) (*Community, error) {
id := request.CommunityID
publicKey, err := common.HexToPubkey(request.User.String())
if err != nil {
return nil, err
}
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
if !community.hasMember(publicKey) {
return nil, ErrMemberNotFound
}
_, err = community.AddRoleToMember(publicKey, request.Role)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) RemoveRoleFromMember(request *requests.RemoveRoleFromMember) (*Community, error) {
id := request.CommunityID
publicKey, err := common.HexToPubkey(request.User.String())
if err != nil {
return nil, err
}
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
if !community.hasMember(publicKey) {
return nil, ErrMemberNotFound
}
_, err = community.RemoveRoleFromMember(publicKey, request.Role)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) BanUserFromCommunity(request *requests.BanUserFromCommunity) (*Community, error) {
id := request.CommunityID
publicKey, err := common.HexToPubkey(request.User.String())
if err != nil {
return nil, err
}
community, err := m.GetByID(id)
if err != nil {
return nil, err
}
if community == nil {
return nil, ErrOrgNotFound
}
_, err = community.BanUserFromCommunity(publicKey)
if err != nil {
return nil, err
}
err = m.persistence.SaveCommunity(community)
if err != nil {
return nil, err
}
m.publish(&Subscription{Community: community})
return community, nil
}
func (m *Manager) GetByID(id []byte) (*Community, error) {
return m.persistence.GetByID(&m.identity.PublicKey, id)
}
func (m *Manager) GetByIDString(idString string) (*Community, error) {
id, err := types.DecodeHex(idString)
if err != nil {
return nil, err
}
return m.GetByID(id)
}
func (m *Manager) RequestToJoin(requester *ecdsa.PublicKey, request *requests.RequestToJoinCommunity) (*Community, *RequestToJoin, error) {
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityID)
if err != nil {
return nil, nil, err
}
// We don't allow requesting access if already joined
if community.Joined() {
return nil, nil, ErrAlreadyJoined
}
clock := uint64(time.Now().Unix())
requestToJoin := &RequestToJoin{
PublicKey: common.PubkeyToHex(requester),
Clock: clock,
ENSName: request.ENSName,
CommunityID: request.CommunityID,
State: RequestToJoinStatePending,
Our: true,
}
requestToJoin.CalculateID()
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
return nil, nil, err
}
community.config.RequestedToJoinAt = uint64(time.Now().Unix())
community.AddRequestToJoin(requestToJoin)
return community, requestToJoin, nil
}
func (m *Manager) SaveRequestToJoin(request *RequestToJoin) error {
return m.persistence.SaveRequestToJoin(request)
}
func (m *Manager) CanceledRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*RequestToJoin, error) {
return m.persistence.CanceledRequestsToJoinForUser(common.PubkeyToHex(pk))
}
func (m *Manager) PendingRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*RequestToJoin, error) {
return m.persistence.PendingRequestsToJoinForUser(common.PubkeyToHex(pk))
}
func (m *Manager) PendingRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
m.logger.Info("fetching pending invitations", zap.String("community-id", id.String()))
return m.persistence.PendingRequestsToJoinForCommunity(id)
}
func (m *Manager) DeclinedRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
m.logger.Info("fetching declined invitations", zap.String("community-id", id.String()))
return m.persistence.DeclinedRequestsToJoinForCommunity(id)
}
func (m *Manager) CanceledRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
m.logger.Info("fetching canceled invitations", zap.String("community-id", id.String()))
return m.persistence.CanceledRequestsToJoinForCommunity(id)
}
func (m *Manager) CanPost(pk *ecdsa.PublicKey, communityID string, chatID string, grant []byte) (bool, error) {
community, err := m.GetByIDString(communityID)
if err != nil {
return false, err
}
if community == nil {
return false, nil
}
return community.CanPost(pk, chatID, grant)
}
func (m *Manager) IsEncrypted(communityID string) (bool, error) {
community, err := m.GetByIDString(communityID)
if err != nil {
return false, err
}
return community.Encrypted(), nil
}
func (m *Manager) ShouldHandleSyncCommunity(community *protobuf.SyncCommunity) (bool, error) {
return m.persistence.ShouldHandleSyncCommunity(community)
}
func (m *Manager) ShouldHandleSyncCommunitySettings(communitySettings *protobuf.SyncCommunitySettings) (bool, error) {
return m.persistence.ShouldHandleSyncCommunitySettings(communitySettings)
}
func (m *Manager) HandleSyncCommunitySettings(syncCommunitySettings *protobuf.SyncCommunitySettings) (*CommunitySettings, error) {
id, err := types.DecodeHex(syncCommunitySettings.CommunityId)
if err != nil {
return nil, err
}
settings, err := m.persistence.GetCommunitySettingsByID(id)
if err != nil {
return nil, err
}
if settings == nil {
settings = &CommunitySettings{
CommunityID: syncCommunitySettings.CommunityId,
HistoryArchiveSupportEnabled: syncCommunitySettings.HistoryArchiveSupportEnabled,
Clock: syncCommunitySettings.Clock,
}
}
if syncCommunitySettings.Clock > settings.Clock {
settings.CommunityID = syncCommunitySettings.CommunityId
settings.HistoryArchiveSupportEnabled = syncCommunitySettings.HistoryArchiveSupportEnabled
settings.Clock = syncCommunitySettings.Clock
}
err = m.persistence.SaveCommunitySettings(*settings)
if err != nil {
return nil, err
}
return settings, nil
}
func (m *Manager) SetSyncClock(id []byte, clock uint64) error {
return m.persistence.SetSyncClock(id, clock)
}
func (m *Manager) SetPrivateKey(id []byte, privKey *ecdsa.PrivateKey) error {
return m.persistence.SetPrivateKey(id, privKey)
}
func (m *Manager) GetSyncedRawCommunity(id []byte) (*RawCommunityRow, error) {
return m.persistence.getSyncedRawCommunity(id)
}
func (m *Manager) GetCommunitySettingsByID(id types.HexBytes) (*CommunitySettings, error) {
return m.persistence.GetCommunitySettingsByID(id)
}
func (m *Manager) GetCommunitiesSettings() ([]CommunitySettings, error) {
return m.persistence.GetCommunitiesSettings()
}
func (m *Manager) SaveCommunitySettings(settings CommunitySettings) error {
return m.persistence.SaveCommunitySettings(settings)
}
func (m *Manager) CommunitySettingsExist(id types.HexBytes) (bool, error) {
return m.persistence.CommunitySettingsExist(id)
}
func (m *Manager) DeleteCommunitySettings(id types.HexBytes) error {
return m.persistence.DeleteCommunitySettings(id)
}
func (m *Manager) UpdateCommunitySettings(settings CommunitySettings) error {
return m.persistence.UpdateCommunitySettings(settings)
}
func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) {
adminCommunities, err := m.Created()
if err != nil {
return nil, err
}
chatIDs := make(map[string]bool)
for _, c := range adminCommunities {
if c.Joined() {
for _, id := range c.ChatIDs() {
chatIDs[id] = true
}
}
}
return chatIDs, nil
}
func (m *Manager) IsAdminCommunityByID(communityID types.HexBytes) (bool, error) {
pubKey, err := crypto.DecompressPubkey(communityID)
if err != nil {
return false, err
}
return m.IsAdminCommunity(pubKey)
}
func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
adminCommunities, err := m.Created()
if err != nil {
return false, err
}
for _, c := range adminCommunities {
if c.PrivateKey().PublicKey.Equal(pubKey) {
return true, nil
}
}
return false, nil
}
func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
community, err := m.GetByID(crypto.CompressPubkey(pubKey))
if err != nil {
return false, err
}
return community != nil && community.Joined(), nil
}
func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
if err != nil {
return nil, err
}
filters := []*transport.Filter{}
for _, cid := range chatIDs {
filters = append(filters, m.transport.FilterByChatID(cid))
}
return filters, nil
}
func (m *Manager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.TopicType, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
return nil, err
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.Topic)
}
return topics, nil
}
func (m *Manager) StoreWakuMessage(message *types.Message) error {
return m.persistence.SaveWakuMessage(message)
}
func (m *Manager) StoreWakuMessages(messages []*types.Message) error {
return m.persistence.SaveWakuMessages(messages)
}
func (m *Manager) GetLatestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
return m.persistence.GetLatestWakuMessageTimestamp(topics)
}
func (m *Manager) GetOldestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
return m.persistence.GetOldestWakuMessageTimestamp(topics)
}
func (m *Manager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint64, error) {
return m.persistence.GetLastMessageArchiveEndDate(communityID)
}
func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
m.LogStdout("failed to get community chats filters", zap.Error(err))
return 0, err
}
if len(filters) == 0 {
// If we don't have chat filters, we likely don't have any chats
// associated to this community, which means there's nothing more
// to do here
return 0, nil
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.Topic)
}
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// If we don't have a tracked last message archive end date, it
// means we haven't created an archive before, which means
// the next thing to look at is the oldest waku message timestamp for
// this community
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
if err != nil {
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// This means there's no waku message stored for this community so far
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
return 0, nil
}
}
return lastArchiveEndDateTimestamp, nil
}
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
m.UnseedHistoryArchiveTorrent(communityID)
_, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
if err != nil {
return err
}
return m.SeedHistoryArchiveTorrent(communityID)
}
func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString()
_, exists := m.historyArchiveTasks[id]
if exists {
m.LogStdout("history archive tasks interval already in progres", zap.String("id", id))
return
}
cancel := make(chan struct{})
m.historyArchiveTasks[id] = cancel
m.historyArchiveTasksWaitGroup.Add(1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
m.LogStdout("starting history archive tasks interval", zap.String("id", id))
for {
select {
case <-ticker.C:
m.LogStdout("starting archive task...", zap.String("id", id))
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
continue
}
if lastArchiveEndDateTimestamp == 0 {
// This means there are no waku messages for this community,
// so nothing to do here
m.LogStdout("couldn't determine archive start date - skipping")
continue
}
topics, err := m.GetCommunityChatsTopics(community.ID())
if err != nil {
m.LogStdout("failed to get community chat topics ", zap.Error(err))
continue
}
ts := time.Now().Unix()
to := time.Unix(ts, 0)
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
if err != nil {
m.LogStdout("failed to create and seed history archive", zap.Error(err))
continue
}
case <-cancel:
m.UnseedHistoryArchiveTorrent(community.ID())
delete(m.historyArchiveTasks, id)
m.historyArchiveTasksWaitGroup.Done()
return
}
}
}
func (m *Manager) StopHistoryArchiveTasksIntervals() {
for _, t := range m.historyArchiveTasks {
close(t)
}
// Stoping archive interval tasks is async, so we need
// to wait for all of them to be closed before we shutdown
// the torrent client
m.historyArchiveTasksWaitGroup.Wait()
}
func (m *Manager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
task, ok := m.historyArchiveTasks[communityID.String()]
if ok {
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
close(task)
}
}
type EncodedArchiveData struct {
padding int
bytes []byte
}
func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
}
func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
}
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
loadFromDB := len(msgs) == 0
from := startDate
to := from.Add(partition)
if to.After(endDate) {
to = endDate
}
archiveDir := m.torrentConfig.DataDir + "/" + communityID.String()
torrentDir := m.torrentConfig.TorrentDir
indexPath := archiveDir + "/index"
dataPath := archiveDir + "/data"
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata)
archiveIDs := make([]string, 0)
if _, err := os.Stat(archiveDir); os.IsNotExist(err) {
err := os.MkdirAll(archiveDir, 0700)
if err != nil {
return archiveIDs, err
}
}
if _, err := os.Stat(torrentDir); os.IsNotExist(err) {
err := os.MkdirAll(torrentDir, 0700)
if err != nil {
return archiveIDs, err
}
}
_, err := os.Stat(indexPath)
if err == nil {
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return archiveIDs, err
}
}
var offset uint64 = 0
for hash, metadata := range wakuMessageArchiveIndexProto.Archives {
offset = offset + metadata.Size
wakuMessageArchiveIndex[hash] = metadata
}
var encodedArchives []*EncodedArchiveData
topicsAsByteArrays := topicsAsByteArrays(topics)
m.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{
CommunityID: communityID.String(),
}})
m.LogStdout("creating archives",
zap.Any("startDate", startDate),
zap.Any("endDate", endDate),
zap.Duration("partition", partition),
)
for {
if from.Equal(endDate) || from.After(endDate) {
break
}
m.LogStdout("creating message archive",
zap.Any("from", from),
zap.Any("to", to),
)
var messages []types.Message
if loadFromDB {
messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
if err != nil {
return archiveIDs, err
}
} else {
for _, msg := range msgs {
if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() {
messages = append(messages, *msg)
}
}
}
if len(messages) == 0 {
// No need to create an archive with zero messages
m.LogStdout("no messages in this partition")
from = to
to = to.Add(partition)
if to.After(endDate) {
to = endDate
}
continue
}
// Not only do we partition messages, we also chunk them
// roughly by size, such that each chunk will not exceed a given
// size and archive data doesn't get too big
messageChunks := make([][]types.Message, 0)
currentChunkSize := 0
currentChunk := make([]types.Message, 0)
for _, msg := range messages {
msgSize := len(msg.Payload) + len(msg.Sig)
if msgSize > maxArchiveSizeInBytes {
// we drop messages this big
continue
}
if currentChunkSize+msgSize > maxArchiveSizeInBytes {
messageChunks = append(messageChunks, currentChunk)
currentChunk = make([]types.Message, 0)
currentChunkSize = 0
}
currentChunk = append(currentChunk, msg)
currentChunkSize = currentChunkSize + msgSize
}
messageChunks = append(messageChunks, currentChunk)
for _, messages := range messageChunks {
wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays)
encodedArchive, err := proto.Marshal(wakuMessageArchive)
if err != nil {
return archiveIDs, err
}
if encrypt {
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive)
if err != nil {
return archiveIDs, err
}
encodedArchive, err = proto.Marshal(messageSpec.Message)
if err != nil {
return archiveIDs, err
}
}
rawSize := len(encodedArchive)
padding := 0
size := 0
if rawSize > pieceLength {
size = rawSize + pieceLength - (rawSize % pieceLength)
padding = size - rawSize
} else {
padding = pieceLength - rawSize
size = rawSize + padding
}
wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{
Metadata: wakuMessageArchive.Metadata,
Offset: offset,
Size: uint64(size),
Padding: uint64(padding),
}
wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata)
if err != nil {
return archiveIDs, err
}
archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()
archiveIDs = append(archiveIDs, archiveID)
wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata
encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding})
offset = offset + uint64(rawSize) + uint64(padding)
}
from = to
to = to.Add(partition)
if to.After(endDate) {
to = endDate
}
}
if len(encodedArchives) > 0 {
dataBytes := make([]byte, 0)
for _, encodedArchiveData := range encodedArchives {
dataBytes = append(dataBytes, encodedArchiveData.bytes...)
dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...)
}
wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex
indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto)
if err != nil {
return archiveIDs, err
}
if encrypt {
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes)
if err != nil {
return archiveIDs, err
}
indexBytes, err = proto.Marshal(messageSpec.Message)
if err != nil {
return archiveIDs, err
}
}
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
if err != nil {
return archiveIDs, err
}
file, err := os.OpenFile(dataPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return archiveIDs, err
}
defer file.Close()
_, err = file.Write(dataBytes)
if err != nil {
return archiveIDs, err
}
metaInfo := metainfo.MetaInfo{
AnnounceList: defaultAnnounceList,
}
metaInfo.SetDefaults()
metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey)
info := metainfo.Info{
PieceLength: int64(pieceLength),
}
err = info.BuildFromFilePath(archiveDir)
if err != nil {
return archiveIDs, err
}
metaInfo.InfoBytes, err = bencode.Marshal(info)
if err != nil {
return archiveIDs, err
}
metaInfoBytes, err := bencode.Marshal(metaInfo)
if err != nil {
return archiveIDs, err
}
err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) // nolint: gosec
if err != nil {
return archiveIDs, err
}
m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))
m.publish(&Subscription{
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
From: int(startDate.Unix()),
To: int(endDate.Unix()),
},
})
} else {
m.LogStdout("no archives created")
m.publish(&Subscription{
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
From: int(startDate.Unix()),
To: int(endDate.Unix()),
},
})
}
lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID)
if err != nil {
return archiveIDs, err
}
if lastMessageArchiveEndDate > 0 {
err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
} else {
err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
}
if err != nil {
return archiveIDs, err
}
return archiveIDs, nil
}
func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error {
m.UnseedHistoryArchiveTorrent(communityID)
id := communityID.String()
torrentFile := m.torrentFile(id)
metaInfo, err := metainfo.LoadFromFile(torrentFile)
if err != nil {
return err
}
info, err := metaInfo.UnmarshalInfo()
if err != nil {
return err
}
hash := metaInfo.HashInfoBytes()
m.torrentTasks[id] = hash
if err != nil {
return err
}
torrent, err := m.torrentClient.AddTorrent(metaInfo)
if err != nil {
return err
}
torrent.DownloadAll()
m.publish(&Subscription{
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
CommunityID: communityID.String(),
},
})
magnetLink := metaInfo.Magnet(nil, &info).String()
m.LogStdout("seeding torrent", zap.String("id", id), zap.String("magnetLink", magnetLink))
return nil
}
func (m *Manager) UnseedHistoryArchiveTorrent(communityID types.HexBytes) {
id := communityID.String()
hash, exists := m.torrentTasks[id]
if exists {
torrent, ok := m.torrentClient.Torrent(hash)
if ok {
m.logger.Debug("Unseeding and dropping torrent for community: ", zap.Any("id", id))
torrent.Drop()
delete(m.torrentTasks, id)
m.publish(&Subscription{
HistoryArchivesUnseededSignal: &signal.HistoryArchivesUnseededSignal{
CommunityID: id,
},
})
}
}
}
func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) bool {
id := communityID.String()
hash := m.torrentTasks[id]
torrent, ok := m.torrentClient.Torrent(hash)
return ok && torrent.Seeding()
}
func (m *Manager) GetHistoryArchiveDownloadTask(communityID string) *HistoryArchiveDownloadTask {
return m.historyArchiveDownloadTasks[communityID]
}
func (m *Manager) DeleteHistoryArchiveDownloadTask(communityID string) {
delete(m.historyArchiveDownloadTasks, communityID)
}
func (m *Manager) AddHistoryArchiveDownloadTask(communityID string, task *HistoryArchiveDownloadTask) {
m.historyArchiveDownloadTasks[communityID] = task
}
type HistoryArchiveDownloadTaskInfo struct {
TotalDownloadedArchivesCount int
TotalArchivesCount int
Cancelled bool
}
func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string, cancelTask chan struct{}) (*HistoryArchiveDownloadTaskInfo, error) {
id := communityID.String()
ml, err := metainfo.ParseMagnetUri(magnetlink)
if err != nil {
return nil, err
}
m.logger.Debug("adding torrent via magnetlink for community", zap.String("id", id), zap.String("magnetlink", magnetlink))
torrent, err := m.torrentClient.AddMagnet(magnetlink)
if err != nil {
return nil, err
}
downloadTaskInfo := &HistoryArchiveDownloadTaskInfo{
TotalDownloadedArchivesCount: 0,
TotalArchivesCount: 0,
Cancelled: false,
}
m.torrentTasks[id] = ml.InfoHash
timeout := time.After(20 * time.Second)
m.LogStdout("fetching torrent info", zap.String("magnetlink", magnetlink))
select {
case <-timeout:
return nil, ErrTorrentTimedout
case <-cancelTask:
m.LogStdout("cancelled fetching torrent info")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
case <-torrent.GotInfo():
files := torrent.Files()
i, ok := findIndexFile(files)
if !ok {
// We're dealing with a malformed torrent, so don't do anything
return nil, errors.New("malformed torrent data")
}
indexFile := files[i]
indexFile.Download()
m.LogStdout("downloading history archive index")
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-cancelTask:
m.LogStdout("cancelled downloading archive index")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
case <-ticker.C:
if indexFile.BytesCompleted() == indexFile.Length() {
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return nil, err
}
existingArchiveIDs, err := m.persistence.GetDownloadedMessageArchiveIDs(communityID)
if err != nil {
return nil, err
}
if len(existingArchiveIDs) == len(index.Archives) {
m.LogStdout("download cancelled, no new archives")
return downloadTaskInfo, nil
}
downloadTaskInfo.TotalDownloadedArchivesCount = len(existingArchiveIDs)
downloadTaskInfo.TotalArchivesCount = len(index.Archives)
archiveHashes := make(archiveMDSlice, 0, downloadTaskInfo.TotalArchivesCount)
for hash, metadata := range index.Archives {
archiveHashes = append(archiveHashes, &archiveMetadata{hash: hash, from: metadata.Metadata.From})
}
sort.Sort(sort.Reverse(archiveHashes))
m.publish(&Subscription{
DownloadingHistoryArchivesStartedSignal: &signal.DownloadingHistoryArchivesStartedSignal{
CommunityID: communityID.String(),
},
})
for _, hd := range archiveHashes {
hash := hd.hash
hasArchive := false
for _, existingHash := range existingArchiveIDs {
if existingHash == hash {
hasArchive = true
break
}
}
if hasArchive {
continue
}
metadata := index.Archives[hash]
startIndex := int(metadata.Offset) / pieceLength
endIndex := startIndex + int(metadata.Size)/pieceLength
downloadMsg := fmt.Sprintf("downloading data for message archive (%d/%d)", downloadTaskInfo.TotalDownloadedArchivesCount+1, downloadTaskInfo.TotalArchivesCount)
m.LogStdout(downloadMsg, zap.String("hash", hash))
m.LogStdout("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
torrent.DownloadPieces(startIndex, endIndex)
piecesCompleted := make(map[int]bool)
for i = startIndex; i < endIndex; i++ {
piecesCompleted[i] = false
}
psc := torrent.SubscribePieceStateChanges()
downloadTicker := time.NewTicker(1 * time.Second)
defer downloadTicker.Stop()
downloadLoop:
for {
select {
case <-downloadTicker.C:
done := true
for i = startIndex; i < endIndex; i++ {
piecesCompleted[i] = torrent.PieceState(i).Complete
if !piecesCompleted[i] {
done = false
}
}
if done {
psc.Close()
break downloadLoop
}
case <-cancelTask:
m.LogStdout("downloading archive data interrupted")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
}
}
downloadTaskInfo.TotalDownloadedArchivesCount++
err = m.persistence.SaveMessageArchiveID(communityID, hash)
if err != nil {
m.LogStdout("couldn't save message archive ID", zap.Error(err))
continue
}
m.publish(&Subscription{
HistoryArchiveDownloadedSignal: &signal.HistoryArchiveDownloadedSignal{
CommunityID: communityID.String(),
From: int(metadata.Metadata.From),
To: int(metadata.Metadata.To),
},
})
}
m.publish(&Subscription{
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
CommunityID: communityID.String(),
},
})
m.LogStdout("finished downloading archives")
return downloadTaskInfo, nil
}
}
}
}
}
func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
return m.persistence.GetMessageArchiveIDsToImport(communityID)
}
func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) {
id := communityID.String()
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return nil, err
}
dataFile, err := os.Open(m.archiveDataFile(id))
if err != nil {
return nil, err
}
defer dataFile.Close()
m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID))
metadata := index.Archives[archiveID]
_, err = dataFile.Seek(int64(metadata.Offset), 0)
if err != nil {
m.LogStdout("failed to seek archive data file", zap.Error(err))
return nil, err
}
data := make([]byte, metadata.Size-metadata.Padding)
_, err = dataFile.Read(data)
if err != nil {
m.LogStdout("failed failed to read archive data", zap.Error(err))
return nil, err
}
archive := &protobuf.WakuMessageArchive{}
err = proto.Unmarshal(data, archive)
if err != nil {
// The archive data might eb encrypted so we try to decrypt instead first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(data, &protocolMessage)
if err != nil {
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
return nil, err
}
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
return nil, err
}
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
if err != nil {
m.LogStdout("failed to decrypt message archive", zap.Error(err))
return nil, err
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
return nil, err
}
}
return archive.Messages, nil
}
func (m *Manager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {
return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported)
}
func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) {
id := communityID.String()
torrentFile := m.torrentFile(id)
metaInfo, err := metainfo.LoadFromFile(torrentFile)
if err != nil {
return "", err
}
info, err := metaInfo.UnmarshalInfo()
if err != nil {
return "", err
}
return metaInfo.Magnet(nil, &info).String(), nil
}
func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
var wakuMessages []*protobuf.WakuMessage
for _, msg := range messages {
topic := types.TopicTypeToByteArray(msg.Topic)
wakuMessage := &protobuf.WakuMessage{
Sig: msg.Sig,
Timestamp: uint64(msg.Timestamp),
Topic: topic,
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
ThirdPartyId: msg.ThirdPartyID,
}
wakuMessages = append(wakuMessages, wakuMessage)
}
metadata := protobuf.WakuMessageArchiveMetadata{
From: uint64(from.Unix()),
To: uint64(to.Unix()),
ContentTopic: topics,
}
wakuMessageArchive := &protobuf.WakuMessageArchive{
Metadata: &metadata,
Messages: wakuMessages,
}
return wakuMessageArchive
}
func (m *Manager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
indexPath := m.archiveIndexFile(communityID.String())
indexData, err := os.ReadFile(indexPath)
if err != nil {
return nil, err
}
err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto)
if err != nil {
return nil, err
}
if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 {
// This means we're dealing with an encrypted index file, so we have to decrypt it first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(indexData, &protocolMessage)
if err != nil {
return nil, err
}
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
return nil, err
}
decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0))
if err != nil {
return nil, err
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto)
if err != nil {
return nil, err
}
}
return wakuMessageArchiveIndexProto, nil
}
func (m *Manager) TorrentFileExists(communityID string) bool {
_, err := os.Stat(m.torrentFile(communityID))
return err == nil
}
func (m *Manager) torrentFile(communityID string) string {
return m.torrentConfig.TorrentDir + "/" + communityID + ".torrent"
}
func (m *Manager) archiveIndexFile(communityID string) string {
return m.torrentConfig.DataDir + "/" + communityID + "/index"
}
func (m *Manager) archiveDataFile(communityID string) string {
return m.torrentConfig.DataDir + "/" + communityID + "/data"
}
func topicsAsByteArrays(topics []types.TopicType) [][]byte {
var topicsAsByteArrays [][]byte
for _, t := range topics {
topic := types.TopicTypeToByteArray(t)
topicsAsByteArrays = append(topicsAsByteArrays, topic)
}
return topicsAsByteArrays
}
func findIndexFile(files []*torrent.File) (index int, ok bool) {
for i, f := range files {
if f.DisplayPath() == "index" {
return i, true
}
}
return 0, false
}