feat: check member permission funds periodically
This adds a periodic member permission check for every admin community such that member funds are checked every hour.
This commit is contained in:
parent
128f82df09
commit
5ec9e1ac46
|
@ -49,27 +49,30 @@ var pieceLength = 100 * 1024
|
||||||
|
|
||||||
const maxArchiveSizeInBytes = 30000000
|
const maxArchiveSizeInBytes = 30000000
|
||||||
|
|
||||||
|
var memberPermissionsCheckInterval = 1 * time.Hour
|
||||||
|
|
||||||
var ErrTorrentTimedout = errors.New("torrent has timed out")
|
var ErrTorrentTimedout = errors.New("torrent has timed out")
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
persistence *Persistence
|
persistence *Persistence
|
||||||
encryptor *encryption.Protocol
|
encryptor *encryption.Protocol
|
||||||
ensSubscription chan []*ens.VerificationRecord
|
ensSubscription chan []*ens.VerificationRecord
|
||||||
subscriptions []chan *Subscription
|
subscriptions []chan *Subscription
|
||||||
ensVerifier *ens.Verifier
|
ensVerifier *ens.Verifier
|
||||||
identity *ecdsa.PrivateKey
|
identity *ecdsa.PrivateKey
|
||||||
accountsManager *account.GethManager
|
accountsManager *account.GethManager
|
||||||
tokenManager *token.Manager
|
tokenManager *token.Manager
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
stdoutLogger *zap.Logger
|
stdoutLogger *zap.Logger
|
||||||
transport *transport.Transport
|
transport *transport.Transport
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
torrentConfig *params.TorrentConfig
|
torrentConfig *params.TorrentConfig
|
||||||
torrentClient *torrent.Client
|
torrentClient *torrent.Client
|
||||||
historyArchiveTasksWaitGroup sync.WaitGroup
|
historyArchiveTasksWaitGroup sync.WaitGroup
|
||||||
historyArchiveTasks map[string]chan struct{}
|
historyArchiveTasks map[string]chan struct{}
|
||||||
torrentTasks map[string]metainfo.Hash
|
periodicMemberPermissionsTasks map[string]chan struct{}
|
||||||
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
|
torrentTasks map[string]metainfo.Hash
|
||||||
|
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
|
||||||
}
|
}
|
||||||
|
|
||||||
type HistoryArchiveDownloadTask struct {
|
type HistoryArchiveDownloadTask struct {
|
||||||
|
@ -134,16 +137,17 @@ func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Pr
|
||||||
}
|
}
|
||||||
|
|
||||||
manager := &Manager{
|
manager := &Manager{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
stdoutLogger: stdoutLogger,
|
stdoutLogger: stdoutLogger,
|
||||||
encryptor: encryptor,
|
encryptor: encryptor,
|
||||||
identity: identity,
|
identity: identity,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
transport: transport,
|
transport: transport,
|
||||||
torrentConfig: torrentConfig,
|
torrentConfig: torrentConfig,
|
||||||
historyArchiveTasks: make(map[string]chan struct{}),
|
historyArchiveTasks: make(map[string]chan struct{}),
|
||||||
torrentTasks: make(map[string]metainfo.Hash),
|
periodicMemberPermissionsTasks: make(map[string]chan struct{}),
|
||||||
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
|
torrentTasks: make(map[string]metainfo.Hash),
|
||||||
|
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
|
||||||
persistence: &Persistence{
|
persistence: &Persistence{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
db: db,
|
db: db,
|
||||||
|
@ -506,6 +510,13 @@ func (m *Manager) CreateCommunityTokenPermission(request *requests.CreateCommuni
|
||||||
|
|
||||||
m.publish(&Subscription{Community: community})
|
m.publish(&Subscription{Community: community})
|
||||||
|
|
||||||
|
// check existing member permission once, then check periodically
|
||||||
|
err = m.checkMemberPermissions(community.ID())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
go m.CheckMemberPermissionsPeriodically(community.ID())
|
||||||
|
|
||||||
return community, changes, nil
|
return community, changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -537,45 +548,83 @@ func (m *Manager) EditCommunityTokenPermission(request *requests.EditCommunityTo
|
||||||
//
|
//
|
||||||
// We do this in a separate routine to not block
|
// We do this in a separate routine to not block
|
||||||
// this function
|
// this function
|
||||||
go func() {
|
if tokenPermission.Type == protobuf.CommunityTokenPermission_BECOME_MEMBER {
|
||||||
if tokenPermission.Type == protobuf.CommunityTokenPermission_BECOME_MEMBER {
|
go func() {
|
||||||
becomeMemberPermissions := community.TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_MEMBER)
|
err := m.checkMemberPermissions(community.ID())
|
||||||
|
if err != nil {
|
||||||
for memberKey, member := range community.Members() {
|
m.logger.Debug("failed to check member permissions", zap.Error(err))
|
||||||
if memberKey == common.PubkeyToHex(&m.identity.PublicKey) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
walletAddresses := make([]gethcommon.Address, 0)
|
|
||||||
for _, walletAddress := range member.WalletAccounts {
|
|
||||||
walletAddresses = append(walletAddresses, gethcommon.HexToAddress(walletAddress))
|
|
||||||
}
|
|
||||||
|
|
||||||
hasPermission, err := m.checkPermissionToJoin(becomeMemberPermissions, walletAddresses)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Debug("failed to check permission to join", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !hasPermission {
|
|
||||||
pk, err := common.HexToPubkey(memberKey)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Debug("failed to convert hex key to pubkey", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
_, err = community.RemoveUserFromOrg(pk)
|
|
||||||
if err != nil {
|
|
||||||
m.logger.Debug("failed to remove member from community", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
m.publish(&Subscription{Community: community})
|
}()
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
return community, changes, nil
|
return community, changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) checkMemberPermissions(communityID types.HexBytes) error {
|
||||||
|
community, err := m.GetByID(communityID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
becomeMemberPermissions := community.TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_MEMBER)
|
||||||
|
|
||||||
|
if len(becomeMemberPermissions) > 0 {
|
||||||
|
for memberKey, member := range community.Members() {
|
||||||
|
if memberKey == common.PubkeyToHex(&m.identity.PublicKey) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
walletAddresses := make([]gethcommon.Address, 0)
|
||||||
|
for _, walletAddress := range member.WalletAccounts {
|
||||||
|
walletAddresses = append(walletAddresses, gethcommon.HexToAddress(walletAddress))
|
||||||
|
}
|
||||||
|
|
||||||
|
hasPermission, err := m.checkPermissionToJoin(becomeMemberPermissions, walletAddresses)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasPermission {
|
||||||
|
pk, err := common.HexToPubkey(memberKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = community.RemoveUserFromOrg(pk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.publish(&Subscription{Community: community})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) CheckMemberPermissionsPeriodically(communityID types.HexBytes) {
|
||||||
|
|
||||||
|
if _, exists := m.periodicMemberPermissionsTasks[communityID.String()]; exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel := make(chan struct{})
|
||||||
|
m.periodicMemberPermissionsTasks[communityID.String()] = cancel
|
||||||
|
|
||||||
|
ticker := time.NewTicker(memberPermissionsCheckInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
err := m.checkMemberPermissions(communityID)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Debug("failed to check member permissions", zap.Error(err))
|
||||||
|
}
|
||||||
|
case <-cancel:
|
||||||
|
delete(m.periodicMemberPermissionsTasks, communityID.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) DeleteCommunityTokenPermission(request *requests.DeleteCommunityTokenPermission) (*Community, *CommunityChanges, error) {
|
func (m *Manager) DeleteCommunityTokenPermission(request *requests.DeleteCommunityTokenPermission) (*Community, *CommunityChanges, error) {
|
||||||
community, err := m.GetByID(request.CommunityID)
|
community, err := m.GetByID(request.CommunityID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -595,6 +644,14 @@ func (m *Manager) DeleteCommunityTokenPermission(request *requests.DeleteCommuni
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if there's stil BECOME_MEMBER permissions,
|
||||||
|
// if not we can stop checking token criteria on-chain
|
||||||
|
// for members
|
||||||
|
becomeMemberPermissions := community.TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_MEMBER)
|
||||||
|
if cancel, exists := m.periodicMemberPermissionsTasks[community.IDString()]; exists && len(becomeMemberPermissions) == 0 {
|
||||||
|
close(cancel)
|
||||||
|
}
|
||||||
|
|
||||||
m.publish(&Subscription{Community: community})
|
m.publish(&Subscription{Community: community})
|
||||||
|
|
||||||
return community, changes, nil
|
return community, changes, nil
|
||||||
|
|
|
@ -729,6 +729,12 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
||||||
<-available
|
<-available
|
||||||
m.InitHistoryArchiveTasks(adminCommunities)
|
m.InitHistoryArchiveTasks(adminCommunities)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
for _, c := range adminCommunities {
|
||||||
|
if c.Joined() {
|
||||||
|
go m.communitiesManager.CheckMemberPermissionsPeriodically(c.ID())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue