mirror of
https://github.com/status-im/status-go.git
synced 2025-02-13 07:17:04 +00:00
This commit introduces a few changes regarding users accessing communities: While the APIs still exist, community invites should no longer be used, instead communities should merely be "shared". Sharing a community to users allows users to "join" the community, which in reality makes them request access to that community. This means, users have to request access to any community, even if the community has permissions set to NO_MEMBERSHIP Only difference between ON_REQUEST and NO_MEMBERSHIP is that ON_REQUEST communities require manual approval of the owner/admin to access a community. NO_MEMBERSHIP communities accept automatically (as soon as owner/admin receives the request). This also implies that users are no longer optimistically added to the member list of communities, but only after they have been accepted. This introduces a bit of a message ping-pong for users to know that someone is now part of a community
2062 lines
54 KiB
Go
2062 lines
54 KiB
Go
package communities
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/anacrolix/torrent"
|
|
"github.com/anacrolix/torrent/bencode"
|
|
"github.com/anacrolix/torrent/metainfo"
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/params"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/ens"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/requests"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
"github.com/status-im/status-go/signal"
|
|
)
|
|
|
|
var defaultAnnounceList = [][]string{
|
|
{"udp://tracker.opentrackr.org:1337/announce"},
|
|
{"udp://tracker.openbittorrent.com:6969/announce"},
|
|
}
|
|
var pieceLength = 100 * 1024
|
|
|
|
type Manager struct {
|
|
persistence *Persistence
|
|
ensSubscription chan []*ens.VerificationRecord
|
|
subscriptions []chan *Subscription
|
|
ensVerifier *ens.Verifier
|
|
identity *ecdsa.PublicKey
|
|
logger *zap.Logger
|
|
transport *transport.Transport
|
|
quit chan struct{}
|
|
torrentConfig *params.TorrentConfig
|
|
torrentClient *torrent.Client
|
|
historyArchiveTasksWaitGroup sync.WaitGroup
|
|
historyArchiveTasks map[string]chan struct{}
|
|
torrentTasks map[string]metainfo.Hash
|
|
}
|
|
|
|
func NewManager(identity *ecdsa.PublicKey, db *sql.DB, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
|
|
if identity == nil {
|
|
return nil, errors.New("empty identity")
|
|
}
|
|
|
|
var err error
|
|
if logger == nil {
|
|
if logger, err = zap.NewDevelopment(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to create a logger")
|
|
}
|
|
}
|
|
|
|
manager := &Manager{
|
|
logger: logger,
|
|
identity: identity,
|
|
quit: make(chan struct{}),
|
|
transport: transport,
|
|
torrentConfig: torrentConfig,
|
|
historyArchiveTasks: make(map[string]chan struct{}),
|
|
torrentTasks: make(map[string]metainfo.Hash),
|
|
persistence: &Persistence{
|
|
logger: logger,
|
|
db: db,
|
|
},
|
|
}
|
|
|
|
if verifier != nil {
|
|
|
|
sub := verifier.Subscribe()
|
|
manager.ensSubscription = sub
|
|
manager.ensVerifier = verifier
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
type archiveMDSlice []*archiveMetadata
|
|
|
|
type archiveMetadata struct {
|
|
hash string
|
|
from uint64
|
|
}
|
|
|
|
func (md archiveMDSlice) Len() int {
|
|
return len(md)
|
|
}
|
|
|
|
func (md archiveMDSlice) Swap(i, j int) {
|
|
md[i], md[j] = md[j], md[i]
|
|
}
|
|
|
|
func (md archiveMDSlice) Less(i, j int) bool {
|
|
return md[i].from > md[j].from
|
|
}
|
|
|
|
type Subscription struct {
|
|
Community *Community
|
|
Invitations []*protobuf.CommunityInvitation
|
|
CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal
|
|
HistoryArchivesCreatedSignal *signal.HistoryArchivesCreatedSignal
|
|
NoHistoryArchivesCreatedSignal *signal.NoHistoryArchivesCreatedSignal
|
|
HistoryArchivesSeedingSignal *signal.HistoryArchivesSeedingSignal
|
|
HistoryArchivesUnseededSignal *signal.HistoryArchivesUnseededSignal
|
|
HistoryArchiveDownloadedSignal *signal.HistoryArchiveDownloadedSignal
|
|
}
|
|
|
|
type CommunityResponse struct {
|
|
Community *Community `json:"community"`
|
|
Changes *CommunityChanges `json:"changes"`
|
|
}
|
|
|
|
func (m *Manager) Subscribe() chan *Subscription {
|
|
subscription := make(chan *Subscription, 100)
|
|
m.subscriptions = append(m.subscriptions, subscription)
|
|
return subscription
|
|
}
|
|
|
|
func (m *Manager) Start() error {
|
|
if m.ensVerifier != nil {
|
|
m.runENSVerificationLoop()
|
|
}
|
|
|
|
if m.torrentConfig != nil && m.torrentConfig.Enabled {
|
|
err := m.StartTorrentClient()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) runENSVerificationLoop() {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-m.quit:
|
|
m.logger.Debug("quitting ens verification loop")
|
|
return
|
|
case records, more := <-m.ensSubscription:
|
|
if !more {
|
|
m.logger.Debug("no more ens records, quitting")
|
|
return
|
|
}
|
|
m.logger.Info("received records", zap.Any("records", records))
|
|
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (m *Manager) Stop() error {
|
|
close(m.quit)
|
|
for _, c := range m.subscriptions {
|
|
close(c)
|
|
}
|
|
m.StopTorrentClient()
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) SetTorrentConfig(config *params.TorrentConfig) {
|
|
m.torrentConfig = config
|
|
}
|
|
|
|
func (m *Manager) StartTorrentClient() error {
|
|
if m.torrentConfig == nil {
|
|
return fmt.Errorf("Can't start torrent client: missing torrentConfig")
|
|
}
|
|
|
|
if m.TorrentClientStarted() {
|
|
return nil
|
|
}
|
|
|
|
config := torrent.NewDefaultClientConfig()
|
|
config.SetListenAddr(":" + fmt.Sprint(m.torrentConfig.Port))
|
|
config.Seed = true
|
|
|
|
config.DataDir = m.torrentConfig.DataDir
|
|
|
|
if _, err := os.Stat(m.torrentConfig.DataDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(m.torrentConfig.DataDir, 0700)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
m.logger.Info("Starting torrent client", zap.Any("port", m.torrentConfig.Port))
|
|
// Instantiating the client will make it bootstrap and listen eagerly,
|
|
// so no go routine is needed here
|
|
client, err := torrent.NewClient(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.torrentClient = client
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) StopTorrentClient() []error {
|
|
if m.TorrentClientStarted() {
|
|
m.StopHistoryArchiveTasksIntervals()
|
|
m.logger.Info("Stopping torrent client")
|
|
errs := m.torrentClient.Close()
|
|
if len(errs) > 0 {
|
|
return errs
|
|
}
|
|
m.torrentClient = nil
|
|
}
|
|
return make([]error, 0)
|
|
}
|
|
|
|
func (m *Manager) TorrentClientStarted() bool {
|
|
return m.torrentClient != nil
|
|
}
|
|
|
|
func (m *Manager) publish(subscription *Subscription) {
|
|
for _, s := range m.subscriptions {
|
|
select {
|
|
case s <- subscription:
|
|
default:
|
|
m.logger.Warn("subscription channel full, dropping message")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) All() ([]*Community, error) {
|
|
return m.persistence.AllCommunities(m.identity)
|
|
}
|
|
|
|
type KnownCommunitiesResponse struct {
|
|
ContractCommunities []string `json:"contractCommunities"`
|
|
Descriptions map[string]*Community `json:"communities"`
|
|
UnknownCommunities []string `json:"unknownCommunities"`
|
|
}
|
|
|
|
func (m *Manager) GetStoredDescriptionForCommunities(communityIDs []types.HexBytes) (response *KnownCommunitiesResponse, err error) {
|
|
response = &KnownCommunitiesResponse{
|
|
Descriptions: make(map[string]*Community),
|
|
}
|
|
|
|
for i := range communityIDs {
|
|
communityID := communityIDs[i].String()
|
|
var community *Community
|
|
community, err = m.GetByID(communityIDs[i])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
response.ContractCommunities = append(response.ContractCommunities, communityID)
|
|
|
|
if community != nil {
|
|
response.Descriptions[community.IDString()] = community
|
|
} else {
|
|
response.UnknownCommunities = append(response.UnknownCommunities, communityID)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (m *Manager) Joined() ([]*Community, error) {
|
|
return m.persistence.JoinedCommunities(m.identity)
|
|
}
|
|
|
|
func (m *Manager) JoinedAndPendingCommunitiesWithRequests() ([]*Community, error) {
|
|
return m.persistence.JoinedAndPendingCommunitiesWithRequests(m.identity)
|
|
}
|
|
|
|
func (m *Manager) DeletedCommunities() ([]*Community, error) {
|
|
return m.persistence.DeletedCommunities(m.identity)
|
|
}
|
|
|
|
func (m *Manager) Created() ([]*Community, error) {
|
|
return m.persistence.CreatedCommunities(m.identity)
|
|
}
|
|
|
|
// CreateCommunity takes a description, generates an ID for it, saves it and return it
|
|
func (m *Manager) CreateCommunity(request *requests.CreateCommunity) (*Community, error) {
|
|
|
|
description, err := request.ToCommunityDescription()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description.Members = make(map[string]*protobuf.CommunityMember)
|
|
description.Members[common.PubkeyToHex(m.identity)] = &protobuf.CommunityMember{Roles: []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL}}
|
|
|
|
err = ValidateCommunityDescription(description)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description.Clock = 1
|
|
|
|
key, err := crypto.GenerateKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config := Config{
|
|
ID: &key.PublicKey,
|
|
PrivateKey: key,
|
|
Logger: m.logger,
|
|
Joined: true,
|
|
MemberIdentity: m.identity,
|
|
CommunityDescription: description,
|
|
}
|
|
community, err := New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We join any community we create
|
|
community.Join()
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
// EditCommunity takes a description, updates the community with the description,
|
|
// saves it and returns it
|
|
func (m *Manager) EditCommunity(request *requests.EditCommunity) (*Community, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
if !community.IsAdmin() {
|
|
return nil, errors.New("not an admin")
|
|
}
|
|
|
|
newDescription, err := request.ToCommunityDescription()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Can't create community description: %v", err)
|
|
}
|
|
|
|
// If permissions weren't explicitly set on original request, use existing ones
|
|
if newDescription.Permissions.Access == protobuf.CommunityPermissions_UNKNOWN_ACCESS {
|
|
newDescription.Permissions.Access = community.config.CommunityDescription.Permissions.Access
|
|
}
|
|
// Use existing images for the entries that were not updated
|
|
// NOTE: This will NOT allow deletion of the community image; it will need to
|
|
// be handled separately.
|
|
for imageName := range community.config.CommunityDescription.Identity.Images {
|
|
_, exists := newDescription.Identity.Images[imageName]
|
|
if !exists {
|
|
// If no image was set in ToCommunityDescription then Images is nil.
|
|
if newDescription.Identity.Images == nil {
|
|
newDescription.Identity.Images = make(map[string]*protobuf.IdentityImage)
|
|
}
|
|
newDescription.Identity.Images[imageName] = community.config.CommunityDescription.Identity.Images[imageName]
|
|
}
|
|
}
|
|
// TODO: handle delete image (if needed)
|
|
|
|
err = ValidateCommunityDescription(newDescription)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Edit the community values
|
|
community.Edit(newDescription)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !community.IsAdmin() {
|
|
return nil, errors.New("not an admin")
|
|
}
|
|
|
|
return community.config.PrivateKey, nil
|
|
}
|
|
|
|
func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
|
|
communityID := crypto.CompressPubkey(&key.PublicKey)
|
|
|
|
community, err := m.persistence.GetByID(m.identity, communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community == nil {
|
|
description := &protobuf.CommunityDescription{
|
|
Permissions: &protobuf.CommunityPermissions{},
|
|
}
|
|
|
|
config := Config{
|
|
ID: &key.PublicKey,
|
|
PrivateKey: key,
|
|
Logger: m.logger,
|
|
Joined: true,
|
|
MemberIdentity: m.identity,
|
|
CommunityDescription: description,
|
|
}
|
|
community, err = New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
community.config.PrivateKey = key
|
|
}
|
|
|
|
community.Join()
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) CreateChat(communityID types.HexBytes, chat *protobuf.CommunityChat) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
chatID := uuid.New().String()
|
|
changes, err := community.CreateChat(chatID, chat)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) EditChat(communityID types.HexBytes, chatID string, chat *protobuf.CommunityChat) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(chatID, communityID.String()) {
|
|
chatID = strings.TrimPrefix(chatID, communityID.String())
|
|
}
|
|
|
|
changes, err := community.EditChat(chatID, chat)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) DeleteChat(communityID types.HexBytes, chatID string) (*Community, *protobuf.CommunityDescription, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(chatID, communityID.String()) {
|
|
chatID = strings.TrimPrefix(chatID, communityID.String())
|
|
}
|
|
description, err := community.DeleteChat(chatID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, description, nil
|
|
}
|
|
|
|
func (m *Manager) CreateCategory(request *requests.CreateCommunityCategory) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
categoryID := uuid.New().String()
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
for i, cid := range request.ChatIDs {
|
|
if strings.HasPrefix(cid, request.CommunityID.String()) {
|
|
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
|
|
}
|
|
}
|
|
|
|
changes, err := community.CreateCategory(categoryID, request.CategoryName, request.ChatIDs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) EditCategory(request *requests.EditCommunityCategory) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
for i, cid := range request.ChatIDs {
|
|
if strings.HasPrefix(cid, request.CommunityID.String()) {
|
|
request.ChatIDs[i] = strings.TrimPrefix(cid, request.CommunityID.String())
|
|
}
|
|
}
|
|
|
|
changes, err := community.EditCategory(request.CategoryID, request.CategoryName, request.ChatIDs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) ReorderCategories(request *requests.ReorderCommunityCategories) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
changes, err := community.ReorderCategories(request.CategoryID, request.Position)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) ReorderChat(request *requests.ReorderCommunityChat) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
// Remove communityID prefix from chatID if exists
|
|
if strings.HasPrefix(request.ChatID, request.CommunityID.String()) {
|
|
request.ChatID = strings.TrimPrefix(request.ChatID, request.CommunityID.String())
|
|
}
|
|
|
|
changes, err := community.ReorderChat(request.CategoryID, request.ChatID, request.Position)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) DeleteCategory(request *requests.DeleteCommunityCategory) (*Community, *CommunityChanges, error) {
|
|
community, err := m.GetByID(request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, nil, ErrOrgNotFound
|
|
}
|
|
|
|
changes, err := community.DeleteCategory(request.CategoryID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Advertise changes
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, changes, nil
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte) (*CommunityResponse, error) {
|
|
id := crypto.CompressPubkey(signer)
|
|
community, err := m.persistence.GetByID(m.identity, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community == nil {
|
|
config := Config{
|
|
CommunityDescription: description,
|
|
Logger: m.logger,
|
|
MarshaledCommunityDescription: payload,
|
|
MemberIdentity: m.identity,
|
|
ID: signer,
|
|
}
|
|
|
|
community, err = New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
changes, err := community.UpdateCommunityDescription(signer, description, payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
hasCommunityArchiveInfo, err := m.persistence.HasCommunityArchiveInfo(community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cdMagnetlinkClock := community.config.CommunityDescription.ArchiveMagnetlinkClock
|
|
if !hasCommunityArchiveInfo {
|
|
err = m.persistence.SaveCommunityArchiveInfo(community.ID(), cdMagnetlinkClock, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
magnetlinkClock, err := m.persistence.GetMagnetlinkMessageClock(community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cdMagnetlinkClock > magnetlinkClock {
|
|
err = m.persistence.UpdateMagnetlinkMessageClock(community.ID(), cdMagnetlinkClock)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
pkString := common.PubkeyToHex(m.identity)
|
|
|
|
// If the community require membership, we set whether we should leave/join the community after a state change
|
|
if community.InvitationOnly() || community.OnRequest() || community.AcceptRequestToJoinAutomatically() {
|
|
if changes.HasNewMember(pkString) {
|
|
hasPendingRequest, err := m.persistence.HasPendingRequestsToJoinForUserAndCommunity(pkString, changes.Community.ID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If there's any pending request, we should join the community
|
|
// automatically
|
|
changes.ShouldMemberJoin = hasPendingRequest
|
|
}
|
|
|
|
if changes.HasMemberLeft(pkString) {
|
|
// If we joined previously the community, we should leave it
|
|
changes.ShouldMemberLeave = community.Joined()
|
|
}
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We mark our requests as completed, though maybe we should mark
|
|
// any request for any user that has been added as completed
|
|
if err := m.markRequestToJoin(m.identity, community); err != nil {
|
|
return nil, err
|
|
}
|
|
// Check if there's a change and we should be joining
|
|
|
|
return &CommunityResponse{
|
|
Community: community,
|
|
Changes: changes,
|
|
}, nil
|
|
}
|
|
|
|
// TODO: This is not fully implemented, we want to save the grant passed at
|
|
// this stage and make sure it's used when publishing.
|
|
func (m *Manager) HandleCommunityInvitation(signer *ecdsa.PublicKey, invitation *protobuf.CommunityInvitation, payload []byte) (*CommunityResponse, error) {
|
|
m.logger.Debug("Handling wrapped community description message")
|
|
|
|
community, err := m.HandleWrappedCommunityDescriptionMessage(payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Save grant
|
|
|
|
return community, nil
|
|
}
|
|
|
|
// markRequestToJoin marks all the pending requests to join as completed
|
|
// if we are members
|
|
func (m *Manager) markRequestToJoin(pk *ecdsa.PublicKey, community *Community) error {
|
|
if community.HasMember(pk) {
|
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(pk), community.ID(), RequestToJoinStateAccepted)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) SetMuted(id types.HexBytes, muted bool) error {
|
|
return m.persistence.SetMuted(id, muted)
|
|
}
|
|
|
|
func (m *Manager) AcceptRequestToJoin(request *requests.AcceptRequestToJoinCommunity) (*Community, error) {
|
|
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(dbRequest.CommunityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pk, err := common.HexToPubkey(dbRequest.PublicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = community.AddMember(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := m.markRequestToJoin(pk, community); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetRequestToJoin(ID types.HexBytes) (*RequestToJoin, error) {
|
|
return m.persistence.GetRequestToJoin(ID)
|
|
}
|
|
|
|
func (m *Manager) DeclineRequestToJoin(request *requests.DeclineRequestToJoinCommunity) error {
|
|
dbRequest, err := m.persistence.GetRequestToJoin(request.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.persistence.SetRequestToJoinState(dbRequest.PublicKey, dbRequest.CommunityID, RequestToJoinStateDeclined)
|
|
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoin) (*RequestToJoin, error) {
|
|
community, err := m.persistence.GetByID(m.identity, request.CommunityId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
// If they are already a member, ignore
|
|
if community.HasMember(signer) {
|
|
return nil, ErrAlreadyMember
|
|
}
|
|
|
|
if err := community.ValidateRequestToJoin(signer, request); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestToJoin := &RequestToJoin{
|
|
PublicKey: common.PubkeyToHex(signer),
|
|
Clock: request.Clock,
|
|
ENSName: request.EnsName,
|
|
CommunityID: request.CommunityId,
|
|
State: RequestToJoinStatePending,
|
|
}
|
|
|
|
requestToJoin.CalculateID()
|
|
|
|
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if community.AcceptRequestToJoinAutomatically() {
|
|
err = m.markRequestToJoin(signer, community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
requestToJoin.State = RequestToJoinStateAccepted
|
|
}
|
|
|
|
return requestToJoin, nil
|
|
}
|
|
|
|
func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoinResponse) error {
|
|
|
|
community, err := m.persistence.GetByID(m.identity, request.CommunityId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if community == nil {
|
|
return ErrOrgNotFound
|
|
}
|
|
|
|
communityDescriptionBytes, err := proto.Marshal(request.Community)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We need to wrap `request.Community` in an `ApplicationMetadataMessage`
|
|
// of type `CommunityDescription` because `UpdateCommunityDescription` expects this.
|
|
//
|
|
// This is merely for marsheling/unmarsheling, hence we attaching a `Signature`
|
|
// is not needed.
|
|
metadataMessage := &protobuf.ApplicationMetadataMessage{
|
|
Payload: communityDescriptionBytes,
|
|
Type: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
|
|
}
|
|
|
|
appMetadataMsg, err := proto.Marshal(metadataMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = community.UpdateCommunityDescription(signer, request.Community, appMetadataMsg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if request.Accepted {
|
|
return m.markRequestToJoin(m.identity, community)
|
|
}
|
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(m.identity), community.ID(), RequestToJoinStateDeclined)
|
|
}
|
|
|
|
func (m *Manager) HandleWrappedCommunityDescriptionMessage(payload []byte) (*CommunityResponse, error) {
|
|
m.logger.Debug("Handling wrapped community description message")
|
|
|
|
applicationMetadataMessage := &protobuf.ApplicationMetadataMessage{}
|
|
err := proto.Unmarshal(payload, applicationMetadataMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
|
|
return nil, ErrInvalidMessage
|
|
}
|
|
signer, err := applicationMetadataMessage.RecoverKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
description := &protobuf.CommunityDescription{}
|
|
|
|
err = proto.Unmarshal(applicationMetadataMessage.Payload, description)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return m.HandleCommunityDescriptionMessage(signer, description, payload)
|
|
}
|
|
|
|
func (m *Manager) JoinCommunity(id types.HexBytes) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
community.Join()
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
|
|
return m.persistence.GetMagnetlinkMessageClock(communityID)
|
|
}
|
|
|
|
func (m *Manager) UpdateCommunityDescriptionMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
|
|
community, err := m.GetByIDString(communityID.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
community.config.CommunityDescription.ArchiveMagnetlinkClock = clock
|
|
return m.persistence.SaveCommunity(community)
|
|
}
|
|
|
|
func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock uint64) error {
|
|
return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock)
|
|
}
|
|
|
|
func (m *Manager) LeaveCommunity(id types.HexBytes) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
if community.IsAdmin() {
|
|
_, err = community.RemoveUserFromOrg(m.identity)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
community.Leave()
|
|
err = m.persistence.SaveCommunity(community)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) inviteUsersToCommunity(community *Community, pks []*ecdsa.PublicKey) (*Community, error) {
|
|
var invitations []*protobuf.CommunityInvitation
|
|
for _, pk := range pks {
|
|
invitation, err := community.InviteUserToOrg(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// We mark the user request (if any) as completed
|
|
if err := m.markRequestToJoin(pk, community); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
invitations = append(invitations, invitation)
|
|
}
|
|
|
|
err := m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community, Invitations: invitations})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) InviteUsersToCommunity(communityID types.HexBytes, pks []*ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
return m.inviteUsersToCommunity(community, pks)
|
|
}
|
|
|
|
func (m *Manager) AddMemberToCommunity(communityID types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
err = community.AddMember(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) RemoveUserFromCommunity(id types.HexBytes, pk *ecdsa.PublicKey) (*Community, error) {
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.RemoveUserFromOrg(pk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) UnbanUserFromCommunity(request *requests.UnbanUserFromCommunity) (*Community, error) {
|
|
id := request.CommunityID
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.UnbanUserFromCommunity(publicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) BanUserFromCommunity(request *requests.BanUserFromCommunity) (*Community, error) {
|
|
id := request.CommunityID
|
|
|
|
publicKey, err := common.HexToPubkey(request.User.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
community, err := m.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if community == nil {
|
|
return nil, ErrOrgNotFound
|
|
}
|
|
|
|
_, err = community.BanUserFromCommunity(publicKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = m.persistence.SaveCommunity(community)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.publish(&Subscription{Community: community})
|
|
|
|
return community, nil
|
|
}
|
|
|
|
func (m *Manager) GetByID(id []byte) (*Community, error) {
|
|
return m.persistence.GetByID(m.identity, id)
|
|
}
|
|
|
|
func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
|
id, err := types.DecodeHex(idString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return m.GetByID(id)
|
|
}
|
|
|
|
func (m *Manager) RequestToJoin(requester *ecdsa.PublicKey, request *requests.RequestToJoinCommunity) (*Community, *RequestToJoin, error) {
|
|
community, err := m.persistence.GetByID(m.identity, request.CommunityID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// We don't allow requesting access if already a member
|
|
if community.HasMember(m.identity) {
|
|
return nil, nil, ErrAlreadyMember
|
|
}
|
|
|
|
clock := uint64(time.Now().Unix())
|
|
requestToJoin := &RequestToJoin{
|
|
PublicKey: common.PubkeyToHex(requester),
|
|
Clock: clock,
|
|
ENSName: request.ENSName,
|
|
CommunityID: request.CommunityID,
|
|
State: RequestToJoinStatePending,
|
|
Our: true,
|
|
}
|
|
|
|
requestToJoin.CalculateID()
|
|
|
|
if err := m.persistence.SaveRequestToJoin(requestToJoin); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
community.config.RequestedToJoinAt = uint64(time.Now().Unix())
|
|
community.AddRequestToJoin(requestToJoin)
|
|
|
|
return community, requestToJoin, nil
|
|
}
|
|
|
|
func (m *Manager) SaveRequestToJoin(request *RequestToJoin) error {
|
|
return m.persistence.SaveRequestToJoin(request)
|
|
}
|
|
|
|
func (m *Manager) PendingRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*RequestToJoin, error) {
|
|
return m.persistence.PendingRequestsToJoinForUser(common.PubkeyToHex(pk))
|
|
}
|
|
|
|
func (m *Manager) PendingRequestsToJoinForCommunity(id types.HexBytes) ([]*RequestToJoin, error) {
|
|
m.logger.Info("fetching pending invitations", zap.String("community-id", id.String()))
|
|
return m.persistence.PendingRequestsToJoinForCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) CanPost(pk *ecdsa.PublicKey, communityID string, chatID string, grant []byte) (bool, error) {
|
|
community, err := m.GetByIDString(communityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if community == nil {
|
|
return false, nil
|
|
}
|
|
return community.CanPost(pk, chatID, grant)
|
|
}
|
|
|
|
func (m *Manager) IsEncrypted(communityID string) (bool, error) {
|
|
community, err := m.GetByIDString(communityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return community.Encrypted(), nil
|
|
|
|
}
|
|
func (m *Manager) ShouldHandleSyncCommunity(community *protobuf.SyncCommunity) (bool, error) {
|
|
return m.persistence.ShouldHandleSyncCommunity(community)
|
|
}
|
|
|
|
func (m *Manager) ShouldHandleSyncCommunitySettings(communitySettings *protobuf.SyncCommunitySettings) (bool, error) {
|
|
return m.persistence.ShouldHandleSyncCommunitySettings(communitySettings)
|
|
}
|
|
|
|
func (m *Manager) HandleSyncCommunitySettings(syncCommunitySettings *protobuf.SyncCommunitySettings) (*CommunitySettings, error) {
|
|
id, err := types.DecodeHex(syncCommunitySettings.CommunityId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
settings, err := m.persistence.GetCommunitySettingsByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if settings == nil {
|
|
settings = &CommunitySettings{
|
|
CommunityID: syncCommunitySettings.CommunityId,
|
|
HistoryArchiveSupportEnabled: syncCommunitySettings.HistoryArchiveSupportEnabled,
|
|
Clock: syncCommunitySettings.Clock,
|
|
}
|
|
}
|
|
|
|
if syncCommunitySettings.Clock > settings.Clock {
|
|
settings.CommunityID = syncCommunitySettings.CommunityId
|
|
settings.HistoryArchiveSupportEnabled = syncCommunitySettings.HistoryArchiveSupportEnabled
|
|
settings.Clock = syncCommunitySettings.Clock
|
|
}
|
|
|
|
err = m.persistence.SaveCommunitySettings(*settings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return settings, nil
|
|
}
|
|
|
|
func (m *Manager) SetSyncClock(id []byte, clock uint64) error {
|
|
return m.persistence.SetSyncClock(id, clock)
|
|
}
|
|
|
|
func (m *Manager) SetPrivateKey(id []byte, privKey *ecdsa.PrivateKey) error {
|
|
return m.persistence.SetPrivateKey(id, privKey)
|
|
}
|
|
|
|
func (m *Manager) GetSyncedRawCommunity(id []byte) (*RawCommunityRow, error) {
|
|
return m.persistence.getSyncedRawCommunity(id)
|
|
}
|
|
|
|
func (m *Manager) GetCommunitySettingsByID(id types.HexBytes) (*CommunitySettings, error) {
|
|
return m.persistence.GetCommunitySettingsByID(id)
|
|
}
|
|
|
|
func (m *Manager) GetCommunitiesSettings() ([]CommunitySettings, error) {
|
|
return m.persistence.GetCommunitiesSettings()
|
|
}
|
|
|
|
func (m *Manager) SaveCommunitySettings(settings CommunitySettings) error {
|
|
return m.persistence.SaveCommunitySettings(settings)
|
|
}
|
|
|
|
func (m *Manager) CommunitySettingsExist(id types.HexBytes) (bool, error) {
|
|
return m.persistence.CommunitySettingsExist(id)
|
|
}
|
|
|
|
func (m *Manager) DeleteCommunitySettings(id types.HexBytes) error {
|
|
return m.persistence.DeleteCommunitySettings(id)
|
|
}
|
|
|
|
func (m *Manager) UpdateCommunitySettings(settings CommunitySettings) error {
|
|
return m.persistence.UpdateCommunitySettings(settings)
|
|
}
|
|
|
|
func (m *Manager) GetAdminCommunitiesChatIDs() (map[string]bool, error) {
|
|
adminCommunities, err := m.Created()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chatIDs := make(map[string]bool)
|
|
for _, c := range adminCommunities {
|
|
if c.Joined() {
|
|
for _, id := range c.ChatIDs() {
|
|
chatIDs[id] = true
|
|
}
|
|
}
|
|
}
|
|
return chatIDs, nil
|
|
}
|
|
|
|
func (m *Manager) IsAdminCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
|
|
adminCommunities, err := m.Created()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, c := range adminCommunities {
|
|
if c.PrivateKey().PublicKey.Equal(pubKey) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (m *Manager) IsJoinedCommunity(pubKey *ecdsa.PublicKey) (bool, error) {
|
|
community, err := m.GetByID(crypto.CompressPubkey(pubKey))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return community != nil && community.Joined(), nil
|
|
}
|
|
|
|
func (m *Manager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
|
|
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filters := []*transport.Filter{}
|
|
for _, cid := range chatIDs {
|
|
filters = append(filters, m.transport.FilterByChatID(cid))
|
|
}
|
|
return filters, nil
|
|
}
|
|
|
|
func (m *Manager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.TopicType, error) {
|
|
filters, err := m.GetCommunityChatsFilters(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topics := []types.TopicType{}
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
func (m *Manager) StoreWakuMessage(message *types.Message) error {
|
|
return m.persistence.SaveWakuMessage(message)
|
|
}
|
|
|
|
func (m *Manager) GetLatestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
|
|
return m.persistence.GetLatestWakuMessageTimestamp(topics)
|
|
}
|
|
|
|
func (m *Manager) GetOldestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
|
|
return m.persistence.GetOldestWakuMessageTimestamp(topics)
|
|
}
|
|
|
|
func (m *Manager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint64, error) {
|
|
return m.persistence.GetLastMessageArchiveEndDate(communityID)
|
|
}
|
|
|
|
func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
|
|
filters, err := m.GetCommunityChatsFilters(communityID)
|
|
if err != nil {
|
|
m.logger.Warn("failed to get community chats filters", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
|
|
if len(filters) == 0 {
|
|
// If we don't have chat filters, we likely don't have any chats
|
|
// associated to this community, which means there's nothing more
|
|
// to do here
|
|
return 0, nil
|
|
}
|
|
|
|
topics := []types.TopicType{}
|
|
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
|
|
if err != nil {
|
|
m.logger.Debug("failed to get last archive end date", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// If we don't have a tracked last message archive end date, it
|
|
// means we haven't created an archive before, which means
|
|
// the next thing to look at is the oldest waku message timestamp for
|
|
// this community
|
|
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
|
|
if err != nil {
|
|
m.logger.Warn("failed to get oldest waku message timestamp", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// This means there's no waku message stored for this community so far
|
|
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
|
|
return 0, nil
|
|
}
|
|
}
|
|
|
|
return lastArchiveEndDateTimestamp, nil
|
|
}
|
|
|
|
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error {
|
|
m.UnseedHistoryArchiveTorrent(communityID)
|
|
_, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return m.SeedHistoryArchiveTorrent(communityID)
|
|
}
|
|
|
|
func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
|
|
id := community.IDString()
|
|
_, exists := m.historyArchiveTasks[id]
|
|
|
|
if exists {
|
|
m.logger.Debug("History archive tasks interval already runs for community: ", zap.Any("id", id))
|
|
return
|
|
}
|
|
|
|
cancel := make(chan struct{})
|
|
m.historyArchiveTasks[id] = cancel
|
|
m.historyArchiveTasksWaitGroup.Add(1)
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
m.logger.Debug("Starting history archive tasks interval", zap.Any("id", id))
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
m.logger.Debug("Executing history archive tasks", zap.Any("id", id))
|
|
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to get last archive end date", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if lastArchiveEndDateTimestamp == 0 {
|
|
// This means there are no waku messages for this community,
|
|
// so nothing to do here
|
|
continue
|
|
}
|
|
|
|
topics, err := m.GetCommunityChatsTopics(community.ID())
|
|
if err != nil {
|
|
m.logger.Debug("failed to get community chats topics", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
ts := time.Now().Unix()
|
|
to := time.Unix(ts, 0)
|
|
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
|
|
|
|
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval)
|
|
if err != nil {
|
|
m.logger.Debug("failed to create and seed history archive", zap.Error(err))
|
|
continue
|
|
}
|
|
case <-cancel:
|
|
m.UnseedHistoryArchiveTorrent(community.ID())
|
|
delete(m.historyArchiveTasks, id)
|
|
m.historyArchiveTasksWaitGroup.Done()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) StopHistoryArchiveTasksIntervals() {
|
|
for _, t := range m.historyArchiveTasks {
|
|
close(t)
|
|
}
|
|
// Stoping archive interval tasks is async, so we need
|
|
// to wait for all of them to be closed before we shutdown
|
|
// the torrent client
|
|
m.historyArchiveTasksWaitGroup.Wait()
|
|
}
|
|
|
|
func (m *Manager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
|
|
task, ok := m.historyArchiveTasks[communityID.String()]
|
|
if ok {
|
|
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
|
|
close(task)
|
|
}
|
|
}
|
|
|
|
type EncodedArchiveData struct {
|
|
padding int
|
|
bytes []byte
|
|
}
|
|
|
|
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) ([]string, error) {
|
|
|
|
from := startDate
|
|
to := from.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
|
|
archiveDir := m.torrentConfig.DataDir + "/" + communityID.String()
|
|
torrentDir := m.torrentConfig.TorrentDir
|
|
indexPath := archiveDir + "/index"
|
|
dataPath := archiveDir + "/data"
|
|
|
|
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
|
wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata)
|
|
archiveIDs := make([]string, 0)
|
|
|
|
if _, err := os.Stat(archiveDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(archiveDir, 0700)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
if _, err := os.Stat(torrentDir); os.IsNotExist(err) {
|
|
err := os.MkdirAll(torrentDir, 0700)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
_, err := os.Stat(indexPath)
|
|
if err == nil {
|
|
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
var offset uint64 = 0
|
|
|
|
for hash, metadata := range wakuMessageArchiveIndexProto.Archives {
|
|
offset = offset + metadata.Size
|
|
wakuMessageArchiveIndex[hash] = metadata
|
|
}
|
|
|
|
var encodedArchives []*EncodedArchiveData
|
|
topicsAsByteArrays := topicsAsByteArrays(topics)
|
|
|
|
m.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{
|
|
CommunityID: communityID.String(),
|
|
}})
|
|
|
|
m.logger.Debug("Creating archives...",
|
|
zap.Any("startDate", startDate),
|
|
zap.Any("endDate", endDate),
|
|
zap.Duration("partition", partition),
|
|
)
|
|
for {
|
|
if from.Equal(endDate) || from.After(endDate) {
|
|
break
|
|
}
|
|
m.logger.Debug("Creating message archive",
|
|
zap.Duration("partition", partition),
|
|
zap.Any("from", from),
|
|
zap.Any("to", to),
|
|
)
|
|
messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
if len(messages) == 0 {
|
|
// No need to create an archive with zero messages
|
|
m.logger.Debug("No messages in this partition")
|
|
from = to
|
|
to = to.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
continue
|
|
}
|
|
|
|
wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays)
|
|
encodedArchive, err := proto.Marshal(wakuMessageArchive)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
rawSize := len(encodedArchive)
|
|
padding := 0
|
|
size := 0
|
|
|
|
if rawSize > pieceLength {
|
|
size = rawSize + pieceLength - (rawSize % pieceLength)
|
|
padding = size - rawSize
|
|
} else {
|
|
padding = pieceLength - rawSize
|
|
size = rawSize + padding
|
|
}
|
|
|
|
wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{
|
|
Metadata: wakuMessageArchive.Metadata,
|
|
Offset: offset,
|
|
Size: uint64(size),
|
|
Padding: uint64(padding),
|
|
}
|
|
|
|
wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()
|
|
archiveIDs = append(archiveIDs, archiveID)
|
|
wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata
|
|
encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding})
|
|
from = to
|
|
to = to.Add(partition)
|
|
if to.After(endDate) {
|
|
to = endDate
|
|
}
|
|
offset = offset + uint64(rawSize) + uint64(padding)
|
|
}
|
|
|
|
if len(encodedArchives) > 0 {
|
|
|
|
dataBytes := make([]byte, 0)
|
|
if _, err := os.Stat(dataPath); err == nil {
|
|
dataBytes, err = os.ReadFile(dataPath)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
}
|
|
|
|
for _, encodedArchiveData := range encodedArchives {
|
|
dataBytes = append(dataBytes, encodedArchiveData.bytes...)
|
|
dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...)
|
|
}
|
|
|
|
wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex
|
|
indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
err = os.WriteFile(dataPath, dataBytes, 0644) // nolint: gosec
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfo := metainfo.MetaInfo{
|
|
AnnounceList: defaultAnnounceList,
|
|
}
|
|
metaInfo.SetDefaults()
|
|
metaInfo.CreatedBy = common.PubkeyToHex(m.identity)
|
|
|
|
info := metainfo.Info{
|
|
PieceLength: int64(pieceLength),
|
|
}
|
|
|
|
err = info.BuildFromFilePath(archiveDir)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfo.InfoBytes, err = bencode.Marshal(info)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
metaInfoBytes, err := bencode.Marshal(metaInfo)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) // nolint: gosec
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(startDate.Unix()),
|
|
To: int(endDate.Unix()),
|
|
},
|
|
})
|
|
} else {
|
|
m.logger.Debug("No archives created")
|
|
m.publish(&Subscription{
|
|
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(startDate.Unix()),
|
|
To: int(endDate.Unix()),
|
|
},
|
|
})
|
|
}
|
|
|
|
lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID)
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
|
|
if lastMessageArchiveEndDate > 0 {
|
|
err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
|
|
} else {
|
|
err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
|
|
}
|
|
if err != nil {
|
|
return archiveIDs, err
|
|
}
|
|
return archiveIDs, nil
|
|
}
|
|
|
|
func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error {
|
|
m.UnseedHistoryArchiveTorrent(communityID)
|
|
|
|
id := communityID.String()
|
|
torrentFile := m.torrentFile(id)
|
|
|
|
metaInfo, err := metainfo.LoadFromFile(torrentFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
info, err := metaInfo.UnmarshalInfo()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hash := metaInfo.HashInfoBytes()
|
|
m.torrentTasks[id] = hash
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
torrent, err := m.torrentClient.AddTorrent(metaInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
torrent.DownloadAll()
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
|
|
CommunityID: communityID.String(),
|
|
},
|
|
})
|
|
m.logger.Info("Seeding torrent", zap.String("id", id))
|
|
m.logger.Info(metaInfo.Magnet(nil, &info).String())
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) UnseedHistoryArchiveTorrent(communityID types.HexBytes) {
|
|
id := communityID.String()
|
|
hash, exists := m.torrentTasks[id]
|
|
|
|
if exists {
|
|
torrent, ok := m.torrentClient.Torrent(hash)
|
|
if ok {
|
|
m.logger.Debug("Unseeding and dropping torrent for community: ", zap.Any("id", id))
|
|
torrent.Drop()
|
|
delete(m.torrentTasks, id)
|
|
|
|
m.publish(&Subscription{
|
|
HistoryArchivesUnseededSignal: &signal.HistoryArchivesUnseededSignal{
|
|
CommunityID: id,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) IsSeedingHistoryArchiveTorrent(communityID types.HexBytes) bool {
|
|
id := communityID.String()
|
|
hash := m.torrentTasks[id]
|
|
torrent, ok := m.torrentClient.Torrent(hash)
|
|
return ok && torrent.Seeding()
|
|
}
|
|
|
|
func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string) ([]string, error) {
|
|
|
|
id := communityID.String()
|
|
ml, err := metainfo.ParseMagnetUri(magnetlink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.logger.Debug("adding torrent via magnetlink for community", zap.String("id", id), zap.String("magnetlink", magnetlink))
|
|
torrent, err := m.torrentClient.AddMagnet(magnetlink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.torrentTasks[id] = ml.InfoHash
|
|
timeout := time.After(20 * time.Second)
|
|
select {
|
|
case <-timeout:
|
|
return nil, errors.New("torrent has timed out")
|
|
case <-torrent.GotInfo():
|
|
files := torrent.Files()
|
|
|
|
i, ok := findIndexFile(files)
|
|
if !ok {
|
|
// We're dealing with a malformed torrent, so don't do anything
|
|
return nil, errors.New("malformed torrent data")
|
|
}
|
|
|
|
indexFile := files[i]
|
|
indexFile.Download()
|
|
m.logger.Debug("downloading history archive index")
|
|
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
<-ticker.C
|
|
if indexFile.BytesCompleted() == indexFile.Length() {
|
|
index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var archiveIDs []string
|
|
|
|
archiveHashes := make(archiveMDSlice, 0, len(index.Archives))
|
|
|
|
for hash, metadata := range index.Archives {
|
|
archiveHashes = append(archiveHashes, &archiveMetadata{hash: hash, from: metadata.Metadata.From})
|
|
}
|
|
|
|
sort.Sort(archiveHashes)
|
|
|
|
for _, hd := range archiveHashes {
|
|
hash := hd.hash
|
|
metadata := index.Archives[hash]
|
|
hasArchive, err := m.persistence.HasMessageArchiveID(communityID, hash)
|
|
if err != nil {
|
|
m.logger.Debug("Failed to check if has message archive id", zap.Error(err))
|
|
continue
|
|
}
|
|
if hasArchive {
|
|
continue
|
|
}
|
|
|
|
startIndex := int(metadata.Offset) / pieceLength
|
|
endIndex := startIndex + int(metadata.Size)/pieceLength
|
|
|
|
m.logger.Debug("downloading data for message archive", zap.String("hash", hash))
|
|
m.logger.Debug("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
|
|
torrent.DownloadPieces(startIndex, endIndex)
|
|
|
|
psc := torrent.SubscribePieceStateChanges()
|
|
for {
|
|
i := startIndex
|
|
done := false
|
|
for {
|
|
if i > endIndex-1 {
|
|
break
|
|
}
|
|
done = torrent.PieceState(i).Complete
|
|
i++
|
|
}
|
|
if done {
|
|
psc.Close()
|
|
break
|
|
}
|
|
<-psc.Values
|
|
}
|
|
archiveIDs = append(archiveIDs, hash)
|
|
err = m.persistence.SaveMessageArchiveID(communityID, hash)
|
|
if err != nil {
|
|
m.logger.Debug("couldn't save message archive ID", zap.Error(err))
|
|
continue
|
|
}
|
|
m.publish(&Subscription{
|
|
HistoryArchiveDownloadedSignal: &signal.HistoryArchiveDownloadedSignal{
|
|
CommunityID: communityID.String(),
|
|
From: int(metadata.Metadata.From),
|
|
To: int(metadata.Metadata.To),
|
|
},
|
|
})
|
|
}
|
|
m.publish(&Subscription{
|
|
HistoryArchivesSeedingSignal: &signal.HistoryArchivesSeedingSignal{
|
|
CommunityID: communityID.String(),
|
|
},
|
|
})
|
|
return archiveIDs, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) {
|
|
id := communityID.String()
|
|
|
|
index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
totalData, err := os.ReadFile(m.archiveDataFile(id))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
messages := make(map[transport.Filter][]*types.Message)
|
|
|
|
for _, hash := range archiveIDs {
|
|
metadata := index.Archives[hash]
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
|
|
|
|
err := proto.Unmarshal(data, archive)
|
|
if err != nil {
|
|
m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
for _, message := range archive.Messages {
|
|
filter := m.transport.FilterByTopic(message.Topic)
|
|
if filter != nil {
|
|
shhMessage := &types.Message{
|
|
Sig: message.Sig,
|
|
Timestamp: uint32(message.Timestamp),
|
|
Topic: types.BytesToTopic(message.Topic),
|
|
Payload: message.Payload,
|
|
Padding: message.Padding,
|
|
Hash: message.Hash,
|
|
}
|
|
messages[*filter] = append(messages[*filter], shhMessage)
|
|
}
|
|
}
|
|
}
|
|
return messages, nil
|
|
}
|
|
|
|
func (m *Manager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) {
|
|
id := communityID.String()
|
|
torrentFile := m.torrentFile(id)
|
|
|
|
metaInfo, err := metainfo.LoadFromFile(torrentFile)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
info, err := metaInfo.UnmarshalInfo()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return metaInfo.Magnet(nil, &info).String(), nil
|
|
}
|
|
|
|
func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
|
|
var wakuMessages []*protobuf.WakuMessage
|
|
|
|
for _, msg := range messages {
|
|
topic := types.TopicTypeToByteArray(msg.Topic)
|
|
wakuMessage := &protobuf.WakuMessage{
|
|
Sig: msg.Sig,
|
|
Timestamp: uint64(msg.Timestamp),
|
|
Topic: topic,
|
|
Payload: msg.Payload,
|
|
Padding: msg.Padding,
|
|
Hash: msg.Hash,
|
|
}
|
|
wakuMessages = append(wakuMessages, wakuMessage)
|
|
}
|
|
|
|
metadata := protobuf.WakuMessageArchiveMetadata{
|
|
From: uint64(from.Unix()),
|
|
To: uint64(to.Unix()),
|
|
ContentTopic: topics,
|
|
}
|
|
|
|
wakuMessageArchive := &protobuf.WakuMessageArchive{
|
|
Metadata: &metadata,
|
|
Messages: wakuMessages,
|
|
}
|
|
return wakuMessageArchive
|
|
}
|
|
|
|
func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
|
|
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
|
|
|
indexPath := m.archiveIndexFile(communityID.String())
|
|
indexData, err := os.ReadFile(indexPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return wakuMessageArchiveIndexProto, nil
|
|
}
|
|
|
|
func (m *Manager) torrentFile(communityID string) string {
|
|
return m.torrentConfig.TorrentDir + "/" + communityID + ".torrent"
|
|
}
|
|
|
|
func (m *Manager) archiveIndexFile(communityID string) string {
|
|
return m.torrentConfig.DataDir + "/" + communityID + "/index"
|
|
}
|
|
|
|
func (m *Manager) archiveDataFile(communityID string) string {
|
|
return m.torrentConfig.DataDir + "/" + communityID + "/data"
|
|
}
|
|
|
|
func topicsAsByteArrays(topics []types.TopicType) [][]byte {
|
|
var topicsAsByteArrays [][]byte
|
|
for _, t := range topics {
|
|
topic := types.TopicTypeToByteArray(t)
|
|
topicsAsByteArrays = append(topicsAsByteArrays, topic)
|
|
}
|
|
return topicsAsByteArrays
|
|
}
|
|
|
|
func findIndexFile(files []*torrent.File) (index int, ok bool) {
|
|
for i, f := range files {
|
|
if f.DisplayPath() == "index" {
|
|
return i, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|