122 lines
3.5 KiB
Go
122 lines
3.5 KiB
Go
package storenodes
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/services/mailservers"
|
|
)
|
|
|
|
var (
|
|
ErrNotFound = errors.New("not found")
|
|
)
|
|
|
|
// CommunityStorenodes has methods to handle the storenodes for a community
|
|
type CommunityStorenodes struct {
|
|
storenodesByCommunityIDMutex *sync.RWMutex
|
|
storenodesByCommunityID map[string]storenodesData
|
|
|
|
storenodesDB *Database
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewCommunityStorenodes(storenodesDB *Database, logger *zap.Logger) *CommunityStorenodes {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
return &CommunityStorenodes{
|
|
storenodesByCommunityIDMutex: &sync.RWMutex{},
|
|
storenodesByCommunityID: make(map[string]storenodesData),
|
|
storenodesDB: storenodesDB,
|
|
logger: logger.With(zap.Namespace("CommunityStorenodes")),
|
|
}
|
|
}
|
|
|
|
type storenodesData struct {
|
|
storenodes []Storenode
|
|
}
|
|
|
|
// GetStorenodeByCommunityID returns the active storenode for a community
|
|
func (m *CommunityStorenodes) GetStorenodeByCommunityID(communityID string) (mailservers.Mailserver, error) {
|
|
m.storenodesByCommunityIDMutex.RLock()
|
|
defer m.storenodesByCommunityIDMutex.RUnlock()
|
|
|
|
msData, ok := m.storenodesByCommunityID[communityID]
|
|
if !ok || len(msData.storenodes) == 0 {
|
|
return mailservers.Mailserver{}, ErrNotFound
|
|
}
|
|
return toMailserver(msData.storenodes[0]), nil
|
|
}
|
|
|
|
func (m *CommunityStorenodes) IsCommunityStoreNode(peerID peer.ID) bool {
|
|
m.storenodesByCommunityIDMutex.RLock()
|
|
defer m.storenodesByCommunityIDMutex.RUnlock()
|
|
|
|
for _, data := range m.storenodesByCommunityID {
|
|
for _, snode := range data.storenodes {
|
|
commStorenodeID, err := utils.GetPeerID(snode.Address)
|
|
if err == nil {
|
|
if commStorenodeID == peerID {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *CommunityStorenodes) HasStorenodeSetup(communityID string) bool {
|
|
m.storenodesByCommunityIDMutex.RLock()
|
|
defer m.storenodesByCommunityIDMutex.RUnlock()
|
|
|
|
msData, ok := m.storenodesByCommunityID[communityID]
|
|
return ok && len(msData.storenodes) > 0
|
|
}
|
|
|
|
// ReloadFromDB loads or reloads the mailservers from the database (on adding/deleting mailservers)
|
|
func (m *CommunityStorenodes) ReloadFromDB() error {
|
|
if m.storenodesDB == nil {
|
|
return nil
|
|
}
|
|
m.storenodesByCommunityIDMutex.Lock()
|
|
defer m.storenodesByCommunityIDMutex.Unlock()
|
|
dbNodes, err := m.storenodesDB.getAll()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// overwrite the in-memory storenodes
|
|
m.storenodesByCommunityID = make(map[string]storenodesData)
|
|
for _, node := range dbNodes {
|
|
communityID := node.CommunityID.String()
|
|
if _, ok := m.storenodesByCommunityID[communityID]; !ok {
|
|
m.storenodesByCommunityID[communityID] = storenodesData{}
|
|
}
|
|
data := m.storenodesByCommunityID[communityID]
|
|
data.storenodes = append(data.storenodes, node)
|
|
m.storenodesByCommunityID[communityID] = data
|
|
}
|
|
m.logger.Debug("loaded communities storenodes", zap.Int("count", len(dbNodes)))
|
|
return nil
|
|
}
|
|
|
|
func (m *CommunityStorenodes) UpdateStorenodesInDB(communityID types.HexBytes, snodes []Storenode, clock uint64) error {
|
|
if err := m.storenodesDB.syncSave(communityID, snodes, clock); err != nil {
|
|
return err
|
|
}
|
|
if err := m.ReloadFromDB(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *CommunityStorenodes) GetStorenodesFromDB(communityID types.HexBytes) ([]Storenode, error) {
|
|
return m.storenodesDB.getByCommunityID(communityID)
|
|
}
|