package collectibles import ( "context" "encoding/json" "errors" "math/big" "sync/atomic" "time" "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/bigint" walletCommon "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" ) const ( fetchLimit = 50 // Limit number of collectibles we fetch per provider call accountOwnershipUpdateInterval = 60 * time.Minute accountOwnershipUpdateDelayInterval = 30 * time.Second ) type OwnershipState = int type OwnedCollectibles struct { chainID walletCommon.ChainID account common.Address ids []thirdparty.CollectibleUniqueID } type OwnedCollectiblesChangeType = int const ( OwnedCollectiblesChangeTypeAdded OwnedCollectiblesChangeType = iota + 1 OwnedCollectiblesChangeTypeUpdated OwnedCollectiblesChangeTypeRemoved ) type OwnedCollectiblesChange struct { ownedCollectibles OwnedCollectibles changeType OwnedCollectiblesChangeType } type OwnedCollectiblesChangeCb func(OwnedCollectiblesChange) type TransferCb func(common.Address, walletCommon.ChainID, []transfer.Transfer) const ( OwnershipStateIdle OwnershipState = iota + 1 OwnershipStateDelayed OwnershipStateUpdating OwnershipStateError ) type periodicRefreshOwnedCollectiblesCommand struct { chainID walletCommon.ChainID account common.Address manager *Manager ownershipDB *OwnershipDB walletFeed *event.Feed ownedCollectiblesChangeCb OwnedCollectiblesChangeCb group *async.Group state atomic.Value } func newPeriodicRefreshOwnedCollectiblesCommand( manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address, ownedCollectiblesChangeCb OwnedCollectiblesChangeCb) *periodicRefreshOwnedCollectiblesCommand { ret := &periodicRefreshOwnedCollectiblesCommand{ manager: manager, ownershipDB: ownershipDB, walletFeed: walletFeed, chainID: chainID, account: account, ownedCollectiblesChangeCb: ownedCollectiblesChangeCb, } ret.state.Store(OwnershipStateIdle) return ret } func (c *periodicRefreshOwnedCollectiblesCommand) DelayedCommand() async.Command { return async.SingleShotCommand{ Interval: accountOwnershipUpdateDelayInterval, Init: func(ctx context.Context) (err error) { c.state.Store(OwnershipStateDelayed) return nil }, Runable: c.Command(), }.Run } func (c *periodicRefreshOwnedCollectiblesCommand) Command() async.Command { return async.InfiniteCommand{ Interval: accountOwnershipUpdateInterval, Runable: c.Run, }.Run } func (c *periodicRefreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) { return c.loadOwnedCollectibles(ctx) } func (c *periodicRefreshOwnedCollectiblesCommand) GetState() OwnershipState { return c.state.Load().(OwnershipState) } func (c *periodicRefreshOwnedCollectiblesCommand) Stop() { if c.group != nil { c.group.Stop() c.group.Wait() c.group = nil } } func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx context.Context) error { c.group = async.NewGroup(ctx) ownedCollectiblesChangeCh := make(chan OwnedCollectiblesChange) command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, c.chainID, c.account, ownedCollectiblesChangeCh) c.state.Store(OwnershipStateUpdating) defer func() { if command.err != nil { c.state.Store(OwnershipStateError) } else { c.state.Store(OwnershipStateIdle) } }() c.group.Add(command.Command()) select { case ownedCollectiblesChange := <-ownedCollectiblesChangeCh: if c.ownedCollectiblesChangeCb != nil { c.ownedCollectiblesChangeCb(ownedCollectiblesChange) } case <-ctx.Done(): return ctx.Err() case <-c.group.WaitAsync(): return nil } return nil } // Fetches owned collectibles for a ChainID+OwnerAddress combination in chunks // and updates the ownershipDB when all chunks are loaded type loadOwnedCollectiblesCommand struct { chainID walletCommon.ChainID account common.Address manager *Manager ownershipDB *OwnershipDB walletFeed *event.Feed ownedCollectiblesChangeCh chan<- OwnedCollectiblesChange // Not to be set by the caller partialOwnership []thirdparty.CollectibleIDBalance err error } func newLoadOwnedCollectiblesCommand( manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address, ownedCollectiblesChangeCh chan<- OwnedCollectiblesChange) *loadOwnedCollectiblesCommand { return &loadOwnedCollectiblesCommand{ manager: manager, ownershipDB: ownershipDB, walletFeed: walletFeed, chainID: chainID, account: account, ownedCollectiblesChangeCh: ownedCollectiblesChangeCh, } } func (c *loadOwnedCollectiblesCommand) Command() async.Command { return c.Run } func (c *loadOwnedCollectiblesCommand) triggerEvent(eventType walletevent.EventType, chainID walletCommon.ChainID, account common.Address, message string) { c.walletFeed.Send(walletevent.Event{ Type: eventType, ChainID: uint64(chainID), Accounts: []common.Address{ account, }, Message: message, }) } func ownedTokensToTokenBalancesPerContractAddress(ownership []thirdparty.CollectibleIDBalance) thirdparty.TokenBalancesPerContractAddress { ret := make(thirdparty.TokenBalancesPerContractAddress) for _, idBalance := range ownership { balanceBigInt := idBalance.Balance if balanceBigInt == nil { balanceBigInt = &bigint.BigInt{Int: big.NewInt(1)} } balance := thirdparty.TokenBalance{ TokenID: idBalance.ID.TokenID, Balance: balanceBigInt, } ret[idBalance.ID.ContractID.Address] = append(ret[idBalance.ID.ContractID.Address], balance) } return ret } func (c *loadOwnedCollectiblesCommand) sendOwnedCollectiblesChanges(removed, updated, added []thirdparty.CollectibleUniqueID) { if len(removed) > 0 { c.ownedCollectiblesChangeCh <- OwnedCollectiblesChange{ ownedCollectibles: OwnedCollectibles{ chainID: c.chainID, account: c.account, ids: removed, }, changeType: OwnedCollectiblesChangeTypeRemoved, } } if len(updated) > 0 { c.ownedCollectiblesChangeCh <- OwnedCollectiblesChange{ ownedCollectibles: OwnedCollectibles{ chainID: c.chainID, account: c.account, ids: updated, }, changeType: OwnedCollectiblesChangeTypeUpdated, } } if len(added) > 0 { c.ownedCollectiblesChangeCh <- OwnedCollectiblesChange{ ownedCollectibles: OwnedCollectibles{ chainID: c.chainID, account: c.account, ids: added, }, changeType: OwnedCollectiblesChangeTypeAdded, } } } func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) { logutils.ZapLogger().Debug("start loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), ) pageNr := 0 cursor := thirdparty.FetchFromStartCursor providerID := thirdparty.FetchFromAnyProvider start := time.Now() c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, c.chainID, c.account, "") updateMessage := OwnershipUpdateMessage{} lastFetchTimestamp, err := c.ownershipDB.GetOwnershipUpdateTimestamp(c.account, c.chainID) if err != nil { c.err = err } else { initialFetch := lastFetchTimestamp == InvalidTimestamp // Fetch collectibles in chunks for { if walletCommon.ShouldCancel(parent) { c.err = errors.New("context cancelled") break } pageStart := time.Now() logutils.ZapLogger().Debug("start loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), zap.Int("page", pageNr), ) partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(parent, c.chainID, c.account, cursor, fetchLimit, providerID) if err != nil { logutils.ZapLogger().Error("failed loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), zap.Int("page", pageNr), zap.Error(err), ) c.err = err break } logutils.ZapLogger().Debug("partial loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), zap.Int("page", pageNr), zap.Duration("duration", time.Since(pageStart)), zap.Int("found", len(partialOwnership.Items)), ) c.partialOwnership = append(c.partialOwnership, partialOwnership.Items...) pageNr++ cursor = partialOwnership.NextCursor providerID = partialOwnership.Provider finished := cursor == thirdparty.FetchFromStartCursor // Normally, update the DB once we've finished fetching // If this is the first fetch, make partial updates to the client to get a better UX if initialFetch || finished { balances := ownedTokensToTokenBalancesPerContractAddress(c.partialOwnership) updateMessage.Removed, updateMessage.Updated, updateMessage.Added, err = c.ownershipDB.Update(c.chainID, c.account, balances, start.Unix()) if err != nil { logutils.ZapLogger().Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), zap.Error(err), ) c.err = err break } c.sendOwnedCollectiblesChanges(updateMessage.Removed, updateMessage.Updated, updateMessage.Added) } if finished || c.err != nil { break } else if initialFetch { encodedMessage, err := json.Marshal(updateMessage) if err != nil { c.err = err break } c.triggerEvent(EventCollectiblesOwnershipUpdatePartial, c.chainID, c.account, string(encodedMessage)) updateMessage = OwnershipUpdateMessage{} } } } var encodedMessage []byte if c.err == nil { encodedMessage, c.err = json.Marshal(updateMessage) } if c.err != nil { c.triggerEvent(EventCollectiblesOwnershipUpdateFinishedWithError, c.chainID, c.account, c.err.Error()) } else { c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, c.chainID, c.account, string(encodedMessage)) } logutils.ZapLogger().Debug("end loadOwnedCollectiblesCommand", zap.Uint64("chain", uint64(c.chainID)), zap.Stringer("account", c.account), zap.Duration("in", time.Since(start)), ) return nil }