feat(communities): add encryption suppord to archive protocol
This commit is contained in:
parent
a69a59c601
commit
7eb66d09e7
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/params"
|
"github.com/status-im/status-go/params"
|
||||||
"github.com/status-im/status-go/protocol/common"
|
"github.com/status-im/status-go/protocol/common"
|
||||||
|
"github.com/status-im/status-go/protocol/encryption"
|
||||||
"github.com/status-im/status-go/protocol/ens"
|
"github.com/status-im/status-go/protocol/ens"
|
||||||
"github.com/status-im/status-go/protocol/protobuf"
|
"github.com/status-im/status-go/protocol/protobuf"
|
||||||
"github.com/status-im/status-go/protocol/requests"
|
"github.com/status-im/status-go/protocol/requests"
|
||||||
|
@ -40,10 +41,11 @@ var ErrTorrentTimedout = errors.New("torrent has timed out")
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
persistence *Persistence
|
persistence *Persistence
|
||||||
|
encryptor *encryption.Protocol
|
||||||
ensSubscription chan []*ens.VerificationRecord
|
ensSubscription chan []*ens.VerificationRecord
|
||||||
subscriptions []chan *Subscription
|
subscriptions []chan *Subscription
|
||||||
ensVerifier *ens.Verifier
|
ensVerifier *ens.Verifier
|
||||||
identity *ecdsa.PublicKey
|
identity *ecdsa.PrivateKey
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
stdoutLogger *zap.Logger
|
stdoutLogger *zap.Logger
|
||||||
transport *transport.Transport
|
transport *transport.Transport
|
||||||
|
@ -55,7 +57,7 @@ type Manager struct {
|
||||||
torrentTasks map[string]metainfo.Hash
|
torrentTasks map[string]metainfo.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewManager(identity *ecdsa.PublicKey, db *sql.DB, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
|
func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
|
||||||
if identity == nil {
|
if identity == nil {
|
||||||
return nil, errors.New("empty identity")
|
return nil, errors.New("empty identity")
|
||||||
}
|
}
|
||||||
|
@ -75,6 +77,7 @@ func NewManager(identity *ecdsa.PublicKey, db *sql.DB, logger *zap.Logger, verif
|
||||||
manager := &Manager{
|
manager := &Manager{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
stdoutLogger: stdoutLogger,
|
stdoutLogger: stdoutLogger,
|
||||||
|
encryptor: encryptor,
|
||||||
identity: identity,
|
identity: identity,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
transport: transport,
|
transport: transport,
|
||||||
|
@ -250,7 +253,7 @@ func (m *Manager) publish(subscription *Subscription) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) All() ([]*Community, error) {
|
func (m *Manager) All() ([]*Community, error) {
|
||||||
return m.persistence.AllCommunities(m.identity)
|
return m.persistence.AllCommunities(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
type KnownCommunitiesResponse struct {
|
type KnownCommunitiesResponse struct {
|
||||||
|
@ -285,23 +288,23 @@ func (m *Manager) GetStoredDescriptionForCommunities(communityIDs []types.HexByt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Joined() ([]*Community, error) {
|
func (m *Manager) Joined() ([]*Community, error) {
|
||||||
return m.persistence.JoinedCommunities(m.identity)
|
return m.persistence.JoinedCommunities(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Spectated() ([]*Community, error) {
|
func (m *Manager) Spectated() ([]*Community, error) {
|
||||||
return m.persistence.SpectatedCommunities(m.identity)
|
return m.persistence.SpectatedCommunities(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) JoinedAndPendingCommunitiesWithRequests() ([]*Community, error) {
|
func (m *Manager) JoinedAndPendingCommunitiesWithRequests() ([]*Community, error) {
|
||||||
return m.persistence.JoinedAndPendingCommunitiesWithRequests(m.identity)
|
return m.persistence.JoinedAndPendingCommunitiesWithRequests(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) DeletedCommunities() ([]*Community, error) {
|
func (m *Manager) DeletedCommunities() ([]*Community, error) {
|
||||||
return m.persistence.DeletedCommunities(m.identity)
|
return m.persistence.DeletedCommunities(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Created() ([]*Community, error) {
|
func (m *Manager) Created() ([]*Community, error) {
|
||||||
return m.persistence.CreatedCommunities(m.identity)
|
return m.persistence.CreatedCommunities(&m.identity.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateCommunity takes a description, generates an ID for it, saves it and return it
|
// CreateCommunity takes a description, generates an ID for it, saves it and return it
|
||||||
|
@ -313,7 +316,7 @@ func (m *Manager) CreateCommunity(request *requests.CreateCommunity, publish boo
|
||||||
}
|
}
|
||||||
|
|
||||||
description.Members = make(map[string]*protobuf.CommunityMember)
|
description.Members = make(map[string]*protobuf.CommunityMember)
|
||||||
description.Members[common.PubkeyToHex(m.identity)] = &protobuf.CommunityMember{Roles: []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL}}
|
description.Members[common.PubkeyToHex(&m.identity.PublicKey)] = &protobuf.CommunityMember{Roles: []protobuf.CommunityMember_Roles{protobuf.CommunityMember_ROLE_ALL}}
|
||||||
|
|
||||||
err = ValidateCommunityDescription(description)
|
err = ValidateCommunityDescription(description)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -332,7 +335,7 @@ func (m *Manager) CreateCommunity(request *requests.CreateCommunity, publish boo
|
||||||
PrivateKey: key,
|
PrivateKey: key,
|
||||||
Logger: m.logger,
|
Logger: m.logger,
|
||||||
Joined: true,
|
Joined: true,
|
||||||
MemberIdentity: m.identity,
|
MemberIdentity: &m.identity.PublicKey,
|
||||||
CommunityDescription: description,
|
CommunityDescription: description,
|
||||||
}
|
}
|
||||||
community, err := New(config)
|
community, err := New(config)
|
||||||
|
@ -430,7 +433,7 @@ func (m *Manager) ExportCommunity(id types.HexBytes) (*ecdsa.PrivateKey, error)
|
||||||
func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
|
func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
|
||||||
communityID := crypto.CompressPubkey(&key.PublicKey)
|
communityID := crypto.CompressPubkey(&key.PublicKey)
|
||||||
|
|
||||||
community, err := m.persistence.GetByID(m.identity, communityID)
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, communityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -445,7 +448,7 @@ func (m *Manager) ImportCommunity(key *ecdsa.PrivateKey) (*Community, error) {
|
||||||
PrivateKey: key,
|
PrivateKey: key,
|
||||||
Logger: m.logger,
|
Logger: m.logger,
|
||||||
Joined: true,
|
Joined: true,
|
||||||
MemberIdentity: m.identity,
|
MemberIdentity: &m.identity.PublicKey,
|
||||||
CommunityDescription: description,
|
CommunityDescription: description,
|
||||||
}
|
}
|
||||||
community, err = New(config)
|
community, err = New(config)
|
||||||
|
@ -730,7 +733,7 @@ func (m *Manager) DeleteCategory(request *requests.DeleteCommunityCategory) (*Co
|
||||||
|
|
||||||
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte) (*CommunityResponse, error) {
|
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte) (*CommunityResponse, error) {
|
||||||
id := crypto.CompressPubkey(signer)
|
id := crypto.CompressPubkey(signer)
|
||||||
community, err := m.persistence.GetByID(m.identity, id)
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -740,7 +743,7 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
||||||
CommunityDescription: description,
|
CommunityDescription: description,
|
||||||
Logger: m.logger,
|
Logger: m.logger,
|
||||||
MarshaledCommunityDescription: payload,
|
MarshaledCommunityDescription: payload,
|
||||||
MemberIdentity: m.identity,
|
MemberIdentity: &m.identity.PublicKey,
|
||||||
ID: signer,
|
ID: signer,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,7 +782,7 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pkString := common.PubkeyToHex(m.identity)
|
pkString := common.PubkeyToHex(&m.identity.PublicKey)
|
||||||
|
|
||||||
// If the community require membership, we set whether we should leave/join the community after a state change
|
// If the community require membership, we set whether we should leave/join the community after a state change
|
||||||
if community.InvitationOnly() || community.OnRequest() || community.AcceptRequestToJoinAutomatically() {
|
if community.InvitationOnly() || community.OnRequest() || community.AcceptRequestToJoinAutomatically() {
|
||||||
|
@ -806,7 +809,7 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
||||||
|
|
||||||
// We mark our requests as completed, though maybe we should mark
|
// We mark our requests as completed, though maybe we should mark
|
||||||
// any request for any user that has been added as completed
|
// any request for any user that has been added as completed
|
||||||
if err := m.markRequestToJoin(m.identity, community); err != nil {
|
if err := m.markRequestToJoin(&m.identity.PublicKey, community); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Check if there's a change and we should be joining
|
// Check if there's a change and we should be joining
|
||||||
|
@ -894,7 +897,7 @@ func (m *Manager) DeclineRequestToJoin(request *requests.DeclineRequestToJoinCom
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoin) (*RequestToJoin, error) {
|
func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoin) (*RequestToJoin, error) {
|
||||||
community, err := m.persistence.GetByID(m.identity, request.CommunityId)
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -937,7 +940,7 @@ func (m *Manager) HandleCommunityRequestToJoin(signer *ecdsa.PublicKey, request
|
||||||
|
|
||||||
func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoinResponse) error {
|
func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey, request *protobuf.CommunityRequestToJoinResponse) error {
|
||||||
|
|
||||||
community, err := m.persistence.GetByID(m.identity, request.CommunityId)
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -976,9 +979,9 @@ func (m *Manager) HandleCommunityRequestToJoinResponse(signer *ecdsa.PublicKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
if request.Accepted {
|
if request.Accepted {
|
||||||
return m.markRequestToJoin(m.identity, community)
|
return m.markRequestToJoin(&m.identity.PublicKey, community)
|
||||||
}
|
}
|
||||||
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(m.identity), community.ID(), RequestToJoinStateDeclined)
|
return m.persistence.SetRequestToJoinState(common.PubkeyToHex(&m.identity.PublicKey), community.ID(), RequestToJoinStateDeclined)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) HandleCommunityRequestToLeave(signer *ecdsa.PublicKey, proto *protobuf.CommunityRequestToLeave) error {
|
func (m *Manager) HandleCommunityRequestToLeave(signer *ecdsa.PublicKey, proto *protobuf.CommunityRequestToLeave) error {
|
||||||
|
@ -1082,7 +1085,7 @@ func (m *Manager) LeaveCommunity(id types.HexBytes) (*Community, error) {
|
||||||
return nil, ErrOrgNotFound
|
return nil, ErrOrgNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
community.RemoveOurselvesFromOrg(m.identity)
|
community.RemoveOurselvesFromOrg(&m.identity.PublicKey)
|
||||||
community.Leave()
|
community.Leave()
|
||||||
|
|
||||||
if err = m.persistence.SaveCommunity(community); err != nil {
|
if err = m.persistence.SaveCommunity(community); err != nil {
|
||||||
|
@ -1238,7 +1241,7 @@ func (m *Manager) BanUserFromCommunity(request *requests.BanUserFromCommunity) (
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) GetByID(id []byte) (*Community, error) {
|
func (m *Manager) GetByID(id []byte) (*Community, error) {
|
||||||
return m.persistence.GetByID(m.identity, id)
|
return m.persistence.GetByID(&m.identity.PublicKey, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
||||||
|
@ -1250,7 +1253,7 @@ func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) RequestToJoin(requester *ecdsa.PublicKey, request *requests.RequestToJoinCommunity) (*Community, *RequestToJoin, error) {
|
func (m *Manager) RequestToJoin(requester *ecdsa.PublicKey, request *requests.RequestToJoinCommunity) (*Community, *RequestToJoin, error) {
|
||||||
community, err := m.persistence.GetByID(m.identity, request.CommunityID)
|
community, err := m.persistence.GetByID(&m.identity.PublicKey, request.CommunityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -1533,9 +1536,9 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex
|
||||||
return lastArchiveEndDateTimestamp, nil
|
return lastArchiveEndDateTimestamp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) error {
|
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
|
||||||
m.UnseedHistoryArchiveTorrent(communityID)
|
m.UnseedHistoryArchiveTorrent(communityID)
|
||||||
_, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition)
|
_, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition, encrypt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1586,7 +1589,7 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
|
||||||
to := time.Unix(ts, 0)
|
to := time.Unix(ts, 0)
|
||||||
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
|
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
|
||||||
|
|
||||||
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval)
|
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.LogStdout("failed to create and seed history archive", zap.Error(err))
|
m.LogStdout("failed to create and seed history archive", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
|
@ -1623,7 +1626,7 @@ type EncodedArchiveData struct {
|
||||||
bytes []byte
|
bytes []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration) ([]string, error) {
|
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
|
||||||
|
|
||||||
from := startDate
|
from := startDate
|
||||||
to := from.Add(partition)
|
to := from.Add(partition)
|
||||||
|
@ -1655,7 +1658,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
|
||||||
|
|
||||||
_, err := os.Stat(indexPath)
|
_, err := os.Stat(indexPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(communityID)
|
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return archiveIDs, err
|
return archiveIDs, err
|
||||||
}
|
}
|
||||||
|
@ -1711,6 +1714,18 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
|
||||||
return archiveIDs, err
|
return archiveIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if encrypt {
|
||||||
|
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive)
|
||||||
|
if err != nil {
|
||||||
|
return archiveIDs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
encodedArchive, err = proto.Marshal(messageSpec.Message)
|
||||||
|
if err != nil {
|
||||||
|
return archiveIDs, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rawSize := len(encodedArchive)
|
rawSize := len(encodedArchive)
|
||||||
padding := 0
|
padding := 0
|
||||||
size := 0
|
size := 0
|
||||||
|
@ -1768,6 +1783,17 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
|
||||||
return archiveIDs, err
|
return archiveIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if encrypt {
|
||||||
|
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes)
|
||||||
|
if err != nil {
|
||||||
|
return archiveIDs, err
|
||||||
|
}
|
||||||
|
indexBytes, err = proto.Marshal(messageSpec.Message)
|
||||||
|
if err != nil {
|
||||||
|
return archiveIDs, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
|
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return archiveIDs, err
|
return archiveIDs, err
|
||||||
|
@ -1782,7 +1808,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
|
||||||
AnnounceList: defaultAnnounceList,
|
AnnounceList: defaultAnnounceList,
|
||||||
}
|
}
|
||||||
metaInfo.SetDefaults()
|
metaInfo.SetDefaults()
|
||||||
metaInfo.CreatedBy = common.PubkeyToHex(m.identity)
|
metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey)
|
||||||
|
|
||||||
info := metainfo.Info{
|
info := metainfo.Info{
|
||||||
PieceLength: int64(pieceLength),
|
PieceLength: int64(pieceLength),
|
||||||
|
@ -1951,7 +1977,7 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
|
||||||
for {
|
for {
|
||||||
<-ticker.C
|
<-ticker.C
|
||||||
if indexFile.BytesCompleted() == indexFile.Length() {
|
if indexFile.BytesCompleted() == indexFile.Length() {
|
||||||
index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
|
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1964,7 +1990,7 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
|
||||||
archiveHashes = append(archiveHashes, &archiveMetadata{hash: hash, from: metadata.Metadata.From})
|
archiveHashes = append(archiveHashes, &archiveMetadata{hash: hash, from: metadata.Metadata.From})
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Sort(archiveHashes)
|
sort.Sort(sort.Reverse(archiveHashes))
|
||||||
|
|
||||||
for _, hd := range archiveHashes {
|
for _, hd := range archiveHashes {
|
||||||
hash := hd.hash
|
hash := hd.hash
|
||||||
|
@ -2034,7 +2060,7 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
|
||||||
func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) {
|
func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) {
|
||||||
id := communityID.String()
|
id := communityID.String()
|
||||||
|
|
||||||
index, err := m.LoadHistoryArchiveIndexFromFile(communityID)
|
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2053,10 +2079,31 @@ func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes,
|
||||||
|
|
||||||
err := proto.Unmarshal(data, archive)
|
err := proto.Unmarshal(data, archive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Debug("Failed to unmarshal WakuMessageArchive", zap.Error(err))
|
// The archive data might eb encrypted so we try to decrypt instead first
|
||||||
|
var protocolMessage encryption.ProtocolMessage
|
||||||
|
err := proto.Unmarshal(data, &protocolMessage)
|
||||||
|
if err != nil {
|
||||||
|
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pk, err := crypto.DecompressPubkey(communityID)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
|
||||||
|
if err != nil {
|
||||||
|
m.LogStdout("failed to decrypt message archive", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
|
||||||
|
if err != nil {
|
||||||
|
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, message := range archive.Messages {
|
for _, message := range archive.Messages {
|
||||||
filter := m.transport.FilterByTopic(message.Topic)
|
filter := m.transport.FilterByTopic(message.Topic)
|
||||||
if filter != nil {
|
if filter != nil {
|
||||||
|
@ -2123,7 +2170,7 @@ func (m *Manager) createWakuMessageArchive(from time.Time, to time.Time, message
|
||||||
return wakuMessageArchive
|
return wakuMessageArchive
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
|
func (m *Manager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
|
||||||
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
|
||||||
|
|
||||||
indexPath := m.archiveIndexFile(communityID.String())
|
indexPath := m.archiveIndexFile(communityID.String())
|
||||||
|
@ -2136,6 +2183,28 @@ func (m *Manager) LoadHistoryArchiveIndexFromFile(communityID types.HexBytes) (*
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 {
|
||||||
|
// This means we're dealing with an encrypted index file, so we have to decrypt it first
|
||||||
|
var protocolMessage encryption.ProtocolMessage
|
||||||
|
err := proto.Unmarshal(indexData, &protocolMessage)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pk, err := crypto.DecompressPubkey(communityID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return wakuMessageArchiveIndexProto, nil
|
return wakuMessageArchiveIndexProto, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (s *ManagerSuite) SetupTest() {
|
||||||
key, err := crypto.GenerateKey()
|
key, err := crypto.GenerateKey()
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
m, err := NewManager(&key.PublicKey, db, nil, nil, nil, nil)
|
m, err := NewManager(key, db, nil, nil, nil, nil, nil)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().NoError(m.Start())
|
s.Require().NoError(m.Start())
|
||||||
s.manager = m
|
s.manager = m
|
||||||
|
@ -275,7 +275,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() {
|
||||||
// Partition of 7 days
|
// Partition of 7 days
|
||||||
partition := 7 * 24 * time.Hour
|
partition := 7 * 24 * time.Hour
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
// There are no waku messages in the database so we don't expect
|
// There are no waku messages in the database so we don't expect
|
||||||
|
@ -317,7 +317,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() {
|
||||||
err = s.manager.StoreWakuMessage(&message3)
|
err = s.manager.StoreWakuMessage(&message3)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
|
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
|
||||||
|
@ -327,7 +327,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() {
|
||||||
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID())
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Len(index.Archives, 1)
|
s.Require().Len(index.Archives, 1)
|
||||||
|
|
||||||
|
@ -378,10 +378,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchi
|
||||||
err = s.manager.StoreWakuMessage(&message4)
|
err = s.manager.StoreWakuMessage(&message4)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID())
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Len(index.Archives, 3)
|
s.Require().Len(index.Archives, 3)
|
||||||
|
|
||||||
|
@ -427,10 +427,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() {
|
||||||
err = s.manager.StoreWakuMessage(&message1)
|
err = s.manager.StoreWakuMessage(&message1)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
index, err := s.manager.LoadHistoryArchiveIndexFromFile(community.ID())
|
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Len(index.Archives, 1)
|
s.Require().Len(index.Archives, 1)
|
||||||
|
|
||||||
|
@ -442,10 +442,10 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() {
|
||||||
err = s.manager.StoreWakuMessage(&message2)
|
err = s.manager.StoreWakuMessage(&message2)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
index, err = s.manager.LoadHistoryArchiveIndexFromFile(community.ID())
|
index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
s.Require().Len(index.Archives, 2)
|
s.Require().Len(index.Archives, 2)
|
||||||
}
|
}
|
||||||
|
@ -472,7 +472,7 @@ func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() {
|
||||||
err = s.manager.StoreWakuMessage(&message1)
|
err = s.manager.StoreWakuMessage(&message1)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
||||||
|
@ -509,7 +509,7 @@ func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() {
|
||||||
err = s.manager.StoreWakuMessage(&message1)
|
err = s.manager.StoreWakuMessage(&message1)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition)
|
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
|
||||||
|
|
|
@ -400,7 +400,7 @@ func NewMessenger(
|
||||||
|
|
||||||
ensVerifier := ens.New(node, logger, transp, database, c.verifyENSURL, c.verifyENSContractAddress)
|
ensVerifier := ens.New(node, logger, transp, database, c.verifyENSURL, c.verifyENSContractAddress)
|
||||||
|
|
||||||
communitiesManager, err := communities.NewManager(&identity.PublicKey, database, logger, ensVerifier, transp, c.torrentConfig)
|
communitiesManager, err := communities.NewManager(identity, database, encryptionProtocol, logger, ensVerifier, transp, c.torrentConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1750,7 +1750,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
|
||||||
|
|
||||||
m.communitiesManager.LogStdout("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
|
m.communitiesManager.LogStdout("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
|
||||||
time.AfterFunc(timeToNextInterval, func() {
|
time.AfterFunc(timeToNextInterval, func() {
|
||||||
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval)
|
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
|
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -1760,7 +1760,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
|
||||||
// Looks like the last archive was generated more than `interval`
|
// Looks like the last archive was generated more than `interval`
|
||||||
// ago, so lets create a new archive now and then schedule the archive
|
// ago, so lets create a new archive now and then schedule the archive
|
||||||
// creation loop
|
// creation loop
|
||||||
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval)
|
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
|
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue