2020-11-18 09:16:51 +00:00
|
|
|
package communities
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2023-04-25 12:00:17 +00:00
|
|
|
"context"
|
2022-04-15 18:20:12 +00:00
|
|
|
"image"
|
|
|
|
"image/png"
|
2022-03-21 14:18:36 +00:00
|
|
|
"io/ioutil"
|
2023-04-25 12:00:17 +00:00
|
|
|
"math"
|
|
|
|
"math/big"
|
2022-03-21 14:18:36 +00:00
|
|
|
"os"
|
2020-11-18 09:16:51 +00:00
|
|
|
"testing"
|
2022-03-21 14:18:36 +00:00
|
|
|
"time"
|
2020-11-18 09:16:51 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
gethcommon "github.com/ethereum/go-ethereum/common"
|
|
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
|
|
"github.com/ethereum/go-ethereum/event"
|
2022-03-21 14:18:36 +00:00
|
|
|
"github.com/status-im/status-go/appdatabase"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
2022-04-15 18:20:12 +00:00
|
|
|
userimages "github.com/status-im/status-go/images"
|
2022-03-21 14:18:36 +00:00
|
|
|
"github.com/status-im/status-go/params"
|
2021-05-18 19:32:15 +00:00
|
|
|
"github.com/status-im/status-go/protocol/requests"
|
2022-03-21 14:18:36 +00:00
|
|
|
"github.com/status-im/status-go/protocol/transport"
|
2023-04-25 12:00:17 +00:00
|
|
|
"github.com/status-im/status-go/services/wallet/thirdparty/opensea"
|
2021-05-18 19:32:15 +00:00
|
|
|
|
2020-11-18 09:16:51 +00:00
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
_ "github.com/mutecomm/go-sqlcipher" // require go-sqlcipher that overrides default implementation
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
|
2021-01-11 10:32:51 +00:00
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
2020-11-18 09:16:51 +00:00
|
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
|
|
"github.com/status-im/status-go/protocol/sqlite"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestManagerSuite(t *testing.T) {
|
|
|
|
suite.Run(t, new(ManagerSuite))
|
|
|
|
}
|
|
|
|
|
|
|
|
type ManagerSuite struct {
|
|
|
|
suite.Suite
|
|
|
|
manager *Manager
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) SetupTest() {
|
2022-03-21 14:18:36 +00:00
|
|
|
dbPath, err := ioutil.TempFile("", "")
|
|
|
|
s.NoError(err, "creating temp file for db")
|
2022-09-27 20:27:20 +00:00
|
|
|
db, err := appdatabase.InitializeDB(dbPath.Name(), "", sqlite.ReducedKDFIterationsNumber)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.NoError(err, "creating sqlite db instance")
|
|
|
|
err = sqlite.Migrate(db)
|
|
|
|
s.NoError(err, "protocol migrate")
|
|
|
|
|
2021-01-11 10:32:51 +00:00
|
|
|
key, err := crypto.GenerateKey()
|
2020-11-18 09:16:51 +00:00
|
|
|
s.Require().NoError(err)
|
2021-01-11 10:32:51 +00:00
|
|
|
s.Require().NoError(err)
|
2022-10-14 09:26:10 +00:00
|
|
|
m, err := NewManager(key, db, nil, nil, nil, nil, nil)
|
2021-01-11 10:32:51 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NoError(m.Start())
|
2020-11-18 09:16:51 +00:00
|
|
|
s.manager = m
|
|
|
|
}
|
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
func intToBig(n int64) *hexutil.Big {
|
|
|
|
return (*hexutil.Big)(big.NewInt(n))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) getHistoryTasksCount() int {
|
2023-05-05 16:40:18 +00:00
|
|
|
// sync.Map doesn't have a Len function, so we need to count manually
|
|
|
|
count := 0
|
|
|
|
s.manager.historyArchiveTasks.Range(func(_, _ interface{}) bool {
|
|
|
|
count++
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
return count
|
|
|
|
}
|
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
type openseaClientTestBuilder struct {
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *openseaClientTestBuilder) NewOpenseaClient(chainID uint64, apiKey string, feed *event.Feed) (openseaClient, error) {
|
|
|
|
return opensea.NewOpenseaClient(chainID, apiKey, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
type testTokenManager struct {
|
|
|
|
response map[uint64]map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *testTokenManager) setResponse(chainID uint64, walletAddress, tokenAddress gethcommon.Address, balance int64) {
|
|
|
|
|
|
|
|
if m.response == nil {
|
|
|
|
m.response = make(map[uint64]map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big)
|
|
|
|
}
|
|
|
|
|
|
|
|
if m.response[chainID] == nil {
|
|
|
|
m.response[chainID] = make(map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big)
|
|
|
|
}
|
|
|
|
|
|
|
|
if m.response[chainID][walletAddress] == nil {
|
|
|
|
m.response[chainID][walletAddress] = make(map[gethcommon.Address]*hexutil.Big)
|
|
|
|
}
|
|
|
|
|
|
|
|
m.response[chainID][walletAddress][tokenAddress] = intToBig(balance)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *testTokenManager) GetBalancesByChain(ctx context.Context, accounts, tokenAddresses []gethcommon.Address) (map[uint64]map[gethcommon.Address]map[gethcommon.Address]*hexutil.Big, error) {
|
|
|
|
return m.response, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestRetrieveTokens() {
|
|
|
|
db, err := appdatabase.InitializeDB(sqlite.InMemoryPath, "", sqlite.ReducedKDFIterationsNumber)
|
|
|
|
s.NoError(err, "creating sqlite db instance")
|
|
|
|
err = sqlite.Migrate(db)
|
|
|
|
s.NoError(err, "protocol migrate")
|
|
|
|
|
|
|
|
key, err := crypto.GenerateKey()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
tm := &testTokenManager{}
|
|
|
|
|
|
|
|
options := []ManagerOption{
|
|
|
|
WithWalletConfig(¶ms.WalletConfig{
|
|
|
|
OpenseaAPIKey: "some-key",
|
|
|
|
}),
|
|
|
|
WithOpenseaClientBuilder(&openseaClientTestBuilder{}),
|
|
|
|
WithTokenManager(tm),
|
|
|
|
}
|
|
|
|
|
|
|
|
m, err := NewManager(key, db, nil, nil, nil, nil, nil, options...)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NoError(m.Start())
|
|
|
|
|
|
|
|
var chainID uint64 = 5
|
|
|
|
contractAddresses := make(map[uint64]string)
|
|
|
|
contractAddresses[chainID] = "0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"
|
|
|
|
var decimals uint64 = 18
|
|
|
|
|
|
|
|
var tokenCriteria = []*protobuf.TokenCriteria{
|
|
|
|
&protobuf.TokenCriteria{
|
|
|
|
ContractAddresses: contractAddresses,
|
|
|
|
Symbol: "STT",
|
|
|
|
Type: protobuf.CommunityTokenType_ERC20,
|
|
|
|
Name: "Status Test Token",
|
|
|
|
Amount: "1.000000000000000000",
|
|
|
|
Decimals: decimals,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
var permissions = []*protobuf.CommunityTokenPermission{
|
|
|
|
&protobuf.CommunityTokenPermission{
|
|
|
|
Id: "some-id",
|
|
|
|
Type: protobuf.CommunityTokenPermission_BECOME_MEMBER,
|
|
|
|
TokenCriteria: tokenCriteria,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
wallets := []gethcommon.Address{gethcommon.HexToAddress("0xD6b912e09E797D291E8D0eA3D3D17F8000e01c32")}
|
|
|
|
|
|
|
|
// Set response to exactly the right one
|
|
|
|
tm.setResponse(chainID, wallets[0], gethcommon.HexToAddress(contractAddresses[chainID]), int64(1*math.Pow(10, float64(decimals))))
|
|
|
|
resp, err := m.checkPermissionToJoin(permissions, wallets, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(resp)
|
|
|
|
s.Require().True(resp.Satisfied)
|
|
|
|
|
|
|
|
// Set response to 0
|
|
|
|
tm.setResponse(chainID, wallets[0], gethcommon.HexToAddress(contractAddresses[chainID]), 0)
|
|
|
|
resp, err = m.checkPermissionToJoin(permissions, wallets, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(resp)
|
|
|
|
s.Require().False(resp.Satisfied)
|
|
|
|
}
|
|
|
|
|
2020-11-18 09:16:51 +00:00
|
|
|
func (s *ManagerSuite) TestCreateCommunity() {
|
2021-05-18 19:32:15 +00:00
|
|
|
|
|
|
|
request := &requests.CreateCommunity{
|
|
|
|
Name: "status",
|
|
|
|
Description: "status community description",
|
|
|
|
Membership: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
2020-11-18 09:16:51 +00:00
|
|
|
}
|
|
|
|
|
2022-08-19 12:51:21 +00:00
|
|
|
community, err := s.manager.CreateCommunity(request, true)
|
2020-11-18 09:16:51 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(community)
|
|
|
|
|
|
|
|
communities, err := s.manager.All()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
// Consider status default community
|
|
|
|
s.Require().Len(communities, 2)
|
|
|
|
|
|
|
|
actualCommunity := communities[0]
|
|
|
|
if bytes.Equal(community.ID(), communities[1].ID()) {
|
|
|
|
actualCommunity = communities[1]
|
|
|
|
}
|
|
|
|
|
|
|
|
s.Require().Equal(community.ID(), actualCommunity.ID())
|
|
|
|
s.Require().Equal(community.PrivateKey(), actualCommunity.PrivateKey())
|
|
|
|
s.Require().True(proto.Equal(community.config.CommunityDescription, actualCommunity.config.CommunityDescription))
|
|
|
|
}
|
2021-05-18 19:32:15 +00:00
|
|
|
|
2022-04-15 18:20:12 +00:00
|
|
|
func (s *ManagerSuite) TestCreateCommunity_WithBanner() {
|
|
|
|
// Generate test image bigger than BannerDim
|
|
|
|
testImage := image.NewRGBA(image.Rect(0, 0, 20, 10))
|
|
|
|
|
|
|
|
tmpTestFilePath := s.T().TempDir() + "/test.png"
|
|
|
|
file, err := os.Create(tmpTestFilePath)
|
|
|
|
s.NoError(err)
|
|
|
|
defer file.Close()
|
|
|
|
|
|
|
|
err = png.Encode(file, testImage)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
request := &requests.CreateCommunity{
|
|
|
|
Name: "with_banner",
|
|
|
|
Description: "community with banner ",
|
|
|
|
Membership: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
|
|
|
Banner: userimages.CroppedImage{
|
|
|
|
ImagePath: tmpTestFilePath,
|
|
|
|
X: 1,
|
|
|
|
Y: 1,
|
|
|
|
Width: 10,
|
|
|
|
Height: 5,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2022-08-19 12:51:21 +00:00
|
|
|
community, err := s.manager.CreateCommunity(request, true)
|
2022-04-15 18:20:12 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(community)
|
|
|
|
|
|
|
|
communities, err := s.manager.All()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
// Consider status default community
|
|
|
|
s.Require().Len(communities, 2)
|
|
|
|
s.Require().Equal(len(community.config.CommunityDescription.Identity.Images), 1)
|
|
|
|
testIdentityImage, isMapContainsKey := community.config.CommunityDescription.Identity.Images[userimages.BannerIdentityName]
|
|
|
|
s.Require().True(isMapContainsKey)
|
|
|
|
s.Require().Positive(len(testIdentityImage.Payload))
|
|
|
|
}
|
|
|
|
|
2021-05-18 19:32:15 +00:00
|
|
|
func (s *ManagerSuite) TestEditCommunity() {
|
|
|
|
//create community
|
|
|
|
createRequest := &requests.CreateCommunity{
|
|
|
|
Name: "status",
|
|
|
|
Description: "status community description",
|
|
|
|
Membership: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
|
|
|
}
|
|
|
|
|
2022-08-19 12:51:21 +00:00
|
|
|
community, err := s.manager.CreateCommunity(createRequest, true)
|
2021-05-18 19:32:15 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(community)
|
|
|
|
|
|
|
|
update := &requests.EditCommunity{
|
|
|
|
CommunityID: community.ID(),
|
|
|
|
CreateCommunity: requests.CreateCommunity{
|
|
|
|
Name: "statusEdited",
|
|
|
|
Description: "status community description edited",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
updatedCommunity, err := s.manager.EditCommunity(update)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(updatedCommunity)
|
|
|
|
|
|
|
|
//ensure updated community successfully stored
|
|
|
|
communities, err := s.manager.All()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
// Consider status default community
|
|
|
|
s.Require().Len(communities, 2)
|
|
|
|
|
|
|
|
storedCommunity := communities[0]
|
|
|
|
if bytes.Equal(community.ID(), communities[1].ID()) {
|
|
|
|
storedCommunity = communities[1]
|
|
|
|
}
|
|
|
|
|
|
|
|
s.Require().Equal(storedCommunity.ID(), updatedCommunity.ID())
|
|
|
|
s.Require().Equal(storedCommunity.PrivateKey(), updatedCommunity.PrivateKey())
|
|
|
|
s.Require().Equal(storedCommunity.config.CommunityDescription.Identity.DisplayName, update.CreateCommunity.Name)
|
|
|
|
s.Require().Equal(storedCommunity.config.CommunityDescription.Identity.Description, update.CreateCommunity.Description)
|
|
|
|
}
|
2022-03-09 09:58:05 +00:00
|
|
|
|
|
|
|
func (s *ManagerSuite) TestGetAdminCommuniesChatIDs() {
|
|
|
|
|
2022-03-21 14:18:36 +00:00
|
|
|
community, _, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(community)
|
|
|
|
|
|
|
|
adminChatIDs, err := s.manager.GetAdminCommunitiesChatIDs()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(adminChatIDs, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestStartAndStopTorrentClient() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().NotNil(s.manager.torrentClient)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
_, err = os.Stat(torrentConfig.DataDir)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Equal(s.manager.TorrentClientStarted(), true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestStartHistoryArchiveTasksInterval() {
|
|
|
|
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
community, _, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
interval := 10 * time.Second
|
|
|
|
go s.manager.StartHistoryArchiveTasksInterval(community, interval)
|
|
|
|
// Due to async exec we need to wait a bit until we check
|
|
|
|
// the task count.
|
|
|
|
time.Sleep(5 * time.Second)
|
2023-05-05 16:40:18 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
count := s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 1)
|
2022-03-21 14:18:36 +00:00
|
|
|
|
|
|
|
// We wait another 5 seconds to ensure the first tick has kicked in
|
|
|
|
time.Sleep(5 * time.Second)
|
|
|
|
|
|
|
|
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
|
|
|
s.Require().Error(err)
|
|
|
|
|
|
|
|
s.manager.StopHistoryArchiveTasksInterval(community.ID())
|
|
|
|
s.manager.historyArchiveTasksWaitGroup.Wait()
|
2023-04-25 12:00:17 +00:00
|
|
|
count = s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 0)
|
2022-03-21 14:18:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestStopHistoryArchiveTasksIntervals() {
|
|
|
|
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
community, _, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
interval := 10 * time.Second
|
|
|
|
go s.manager.StartHistoryArchiveTasksInterval(community, interval)
|
|
|
|
|
|
|
|
time.Sleep(2 * time.Second)
|
2023-05-05 16:40:18 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
count := s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 1)
|
|
|
|
|
2022-03-21 14:18:36 +00:00
|
|
|
s.manager.StopHistoryArchiveTasksIntervals()
|
2023-05-05 16:40:18 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
count = s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 0)
|
2022-03-21 14:18:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestStopTorrentClient_ShouldStopHistoryArchiveTasks() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
community, _, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
interval := 10 * time.Second
|
|
|
|
go s.manager.StartHistoryArchiveTasksInterval(community, interval)
|
|
|
|
// Due to async exec we need to wait a bit until we check
|
|
|
|
// the task count.
|
|
|
|
time.Sleep(2 * time.Second)
|
2023-05-05 16:40:18 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
count := s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 1)
|
2022-03-21 14:18:36 +00:00
|
|
|
|
|
|
|
errs := s.manager.StopTorrentClient()
|
|
|
|
s.Require().Len(errs, 0)
|
2023-05-05 16:40:18 +00:00
|
|
|
|
2023-04-25 12:00:17 +00:00
|
|
|
count = s.getHistoryTasksCount()
|
2023-05-05 16:40:18 +00:00
|
|
|
s.Require().Equal(count, 0)
|
2022-03-21 14:18:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() {
|
|
|
|
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 7 days
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
// Partition of 7 days
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
// There are no waku messages in the database so we don't expect
|
|
|
|
// any archives to be created
|
|
|
|
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().Error(err)
|
|
|
|
_, err = os.Stat(s.manager.archiveIndexFile(community.IDString()))
|
|
|
|
s.Require().Error(err)
|
|
|
|
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
|
|
|
s.Require().Error(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 7 days
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
// Partition of 7 days, this should create a single archive
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
// This message is outside of the startDate-endDate range and should not
|
|
|
|
// be part of the archive
|
|
|
|
message3 := buildMessage(endDate.Add(2*time.Hour), topic, []byte{3})
|
|
|
|
|
|
|
|
err = s.manager.StoreWakuMessage(&message1)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
err = s.manager.StoreWakuMessage(&message2)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
err = s.manager.StoreWakuMessage(&message3)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
_, err = os.Stat(s.manager.archiveIndexFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
2022-10-14 09:26:10 +00:00
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 1)
|
|
|
|
|
|
|
|
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
for _, metadata := range index.Archives {
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
|
|
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
|
|
|
|
|
|
|
|
err = proto.Unmarshal(data, archive)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
s.Require().Len(archive.Messages, 2)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchives() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 3 weeks
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 21, 00, 00, 00, 0, time.UTC)
|
|
|
|
// 7 days partition, this should create three archives
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
// We expect 2 archives to be created for startDate - endDate of each
|
|
|
|
// 7 days of data. This message should end up in the second archive
|
|
|
|
message3 := buildMessage(startDate.Add(8*24*time.Hour), topic, []byte{3})
|
|
|
|
// This one should end up in the third archive
|
|
|
|
message4 := buildMessage(startDate.Add(14*24*time.Hour), topic, []byte{4})
|
|
|
|
|
|
|
|
err = s.manager.StoreWakuMessage(&message1)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
err = s.manager.StoreWakuMessage(&message2)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
err = s.manager.StoreWakuMessage(&message3)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
err = s.manager.StoreWakuMessage(&message4)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
2022-10-14 09:26:10 +00:00
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 3)
|
|
|
|
|
|
|
|
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
// First archive has 2 messages
|
|
|
|
// Second archive has 1 message
|
|
|
|
// Third archive has 1 message
|
|
|
|
fromMap := map[uint64]int{
|
|
|
|
uint64(startDate.Unix()): 2,
|
|
|
|
uint64(startDate.Add(partition).Unix()): 1,
|
|
|
|
uint64(startDate.Add(partition * 2).Unix()): 1,
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, metadata := range index.Archives {
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
|
|
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
|
|
|
|
|
|
|
|
err = proto.Unmarshal(data, archive)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(archive.Messages, fromMap[metadata.Metadata.From])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 1 week
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
// 7 days partition, this should create one archive
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
err = s.manager.StoreWakuMessage(&message1)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
2022-10-14 09:26:10 +00:00
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 1)
|
|
|
|
|
|
|
|
// Time range of next week
|
|
|
|
startDate = time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate = time.Date(2020, 1, 14, 00, 00, 00, 0, time.UTC)
|
|
|
|
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
err = s.manager.StoreWakuMessage(&message2)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 2)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 7 days
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
// Partition of 7 days, this should create a single archive
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
// This message is outside of the startDate-endDate range and should not
|
|
|
|
// be part of the archive
|
|
|
|
message3 := buildMessage(endDate.Add(2*time.Hour), topic, []byte{3})
|
|
|
|
|
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3}, topics, startDate, endDate, partition, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
_, err = os.Stat(s.manager.archiveIndexFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 1)
|
|
|
|
|
|
|
|
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
for _, metadata := range index.Archives {
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
|
|
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
|
|
|
|
|
|
|
|
err = proto.Unmarshal(data, archive)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
s.Require().Len(archive.Messages, 2)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldCreateMultipleArchives() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 3 weeks
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 21, 00, 00, 00, 0, time.UTC)
|
|
|
|
// 7 days partition, this should create three archives
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
// We expect 2 archives to be created for startDate - endDate of each
|
|
|
|
// 7 days of data. This message should end up in the second archive
|
|
|
|
message3 := buildMessage(startDate.Add(8*24*time.Hour), topic, []byte{3})
|
|
|
|
// This one should end up in the third archive
|
|
|
|
message4 := buildMessage(startDate.Add(14*24*time.Hour), topic, []byte{4})
|
|
|
|
|
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3, &message4}, topics, startDate, endDate, partition, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 3)
|
|
|
|
|
|
|
|
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
// First archive has 2 messages
|
|
|
|
// Second archive has 1 message
|
|
|
|
// Third archive has 1 message
|
|
|
|
fromMap := map[uint64]int{
|
|
|
|
uint64(startDate.Unix()): 2,
|
|
|
|
uint64(startDate.Add(partition).Unix()): 1,
|
|
|
|
uint64(startDate.Add(partition * 2).Unix()): 1,
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, metadata := range index.Archives {
|
|
|
|
archive := &protobuf.WakuMessageArchive{}
|
|
|
|
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
|
|
|
|
|
|
|
|
err = proto.Unmarshal(data, archive)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(archive.Messages, fromMap[metadata.Metadata.From])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldAppendArchives() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
// Time range of 1 week
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
// 7 days partition, this should create one archive
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
|
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1}, topics, startDate, endDate, partition, false)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 1)
|
|
|
|
|
|
|
|
// Time range of next week
|
|
|
|
startDate = time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate = time.Date(2020, 1, 14, 00, 00, 00, 0, time.UTC)
|
|
|
|
|
|
|
|
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
|
|
|
|
|
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message2}, topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
2022-10-14 09:26:10 +00:00
|
|
|
index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(index.Archives, 2)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
err = s.manager.StoreWakuMessage(&message1)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(s.manager.torrentTasks, 1)
|
|
|
|
|
|
|
|
metaInfoHash := s.manager.torrentTasks[community.IDString()]
|
|
|
|
torrent, ok := s.manager.torrentClient.Torrent(metaInfoHash)
|
|
|
|
defer torrent.Drop()
|
|
|
|
|
|
|
|
s.Require().Equal(ok, true)
|
|
|
|
s.Require().Equal(torrent.Seeding(), true)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() {
|
|
|
|
torrentConfig := buildTorrentConfig()
|
|
|
|
s.manager.SetTorrentConfig(&torrentConfig)
|
|
|
|
|
|
|
|
err := s.manager.StartTorrentClient()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
defer s.manager.StopTorrentClient()
|
|
|
|
|
|
|
|
community, chatID, err := s.buildCommunityWithChat()
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
topic := types.BytesToTopic(transport.ToTopic(chatID))
|
|
|
|
topics := []types.TopicType{topic}
|
|
|
|
|
|
|
|
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
|
|
|
|
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
|
|
|
|
partition := 7 * 24 * time.Hour
|
|
|
|
|
|
|
|
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
|
|
|
|
err = s.manager.StoreWakuMessage(&message1)
|
|
|
|
s.Require().NoError(err)
|
|
|
|
|
feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API
Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.
The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.
This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.
There are only two options to get the messages into the database:
1. Make one write operation with all messages - this slow, takes a long
time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
also slow, takes long but makes the database a bit more responsive as
it's many smaller operations instead of one big one
Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)
Which brings me to the third option.
**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.
This is significantly faster and doesn't block the database.
To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
2022-10-20 14:37:04 +00:00
|
|
|
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
|
2022-03-21 14:18:36 +00:00
|
|
|
s.Require().NoError(err)
|
|
|
|
|
|
|
|
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
|
|
|
s.Require().NoError(err)
|
|
|
|
s.Require().Len(s.manager.torrentTasks, 1)
|
|
|
|
|
|
|
|
metaInfoHash := s.manager.torrentTasks[community.IDString()]
|
|
|
|
|
|
|
|
s.manager.UnseedHistoryArchiveTorrent(community.ID())
|
|
|
|
_, ok := s.manager.torrentClient.Torrent(metaInfoHash)
|
|
|
|
s.Require().Equal(ok, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
func buildTorrentConfig() params.TorrentConfig {
|
|
|
|
torrentConfig := params.TorrentConfig{
|
|
|
|
Enabled: true,
|
|
|
|
DataDir: os.TempDir() + "/archivedata",
|
|
|
|
TorrentDir: os.TempDir() + "/torrents",
|
|
|
|
Port: 9999,
|
|
|
|
}
|
|
|
|
return torrentConfig
|
|
|
|
}
|
|
|
|
|
|
|
|
func buildMessage(timestamp time.Time, topic types.TopicType, hash []byte) types.Message {
|
|
|
|
message := types.Message{
|
|
|
|
Sig: []byte{1},
|
|
|
|
Timestamp: uint32(timestamp.Unix()),
|
|
|
|
Topic: topic,
|
|
|
|
Payload: []byte{1},
|
|
|
|
Padding: []byte{1},
|
|
|
|
Hash: hash,
|
|
|
|
}
|
|
|
|
return message
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *ManagerSuite) buildCommunityWithChat() (*Community, string, error) {
|
2022-03-09 09:58:05 +00:00
|
|
|
createRequest := &requests.CreateCommunity{
|
|
|
|
Name: "status",
|
|
|
|
Description: "status community description",
|
|
|
|
Membership: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
|
|
|
}
|
2022-08-19 12:51:21 +00:00
|
|
|
community, err := s.manager.CreateCommunity(createRequest, true)
|
2022-03-21 14:18:36 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, "", err
|
|
|
|
}
|
2022-03-09 09:58:05 +00:00
|
|
|
chat := &protobuf.CommunityChat{
|
|
|
|
Identity: &protobuf.ChatIdentity{
|
|
|
|
DisplayName: "added-chat",
|
|
|
|
Description: "description",
|
|
|
|
},
|
|
|
|
Permissions: &protobuf.CommunityPermissions{
|
|
|
|
Access: protobuf.CommunityPermissions_NO_MEMBERSHIP,
|
|
|
|
},
|
|
|
|
Members: make(map[string]*protobuf.CommunityMember),
|
|
|
|
}
|
2023-01-26 12:52:43 +00:00
|
|
|
_, changes, err := s.manager.CreateChat(community.ID(), chat, true, "")
|
2022-03-21 14:18:36 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, "", err
|
|
|
|
}
|
2022-03-09 09:58:05 +00:00
|
|
|
|
2022-03-21 14:18:36 +00:00
|
|
|
chatID := ""
|
|
|
|
for cID := range changes.ChatsAdded {
|
|
|
|
chatID = cID
|
|
|
|
break
|
|
|
|
}
|
|
|
|
return community, chatID, nil
|
2022-03-09 09:58:05 +00:00
|
|
|
}
|