feat: emit event on received community collectibles
This commit is contained in:
parent
778753bb57
commit
2a5327d8d2
|
@ -23,6 +23,14 @@ const (
|
|||
|
||||
type OwnershipState = int
|
||||
|
||||
type OwnedCollectibles struct {
|
||||
chainID walletCommon.ChainID
|
||||
account common.Address
|
||||
ids []thirdparty.CollectibleUniqueID
|
||||
}
|
||||
|
||||
type OwnedCollectiblesCb func(OwnedCollectibles)
|
||||
|
||||
const (
|
||||
OwnershipStateIdle OwnershipState = iota + 1
|
||||
OwnershipStateDelayed
|
||||
|
@ -31,23 +39,31 @@ const (
|
|||
)
|
||||
|
||||
type periodicRefreshOwnedCollectiblesCommand struct {
|
||||
chainID walletCommon.ChainID
|
||||
account common.Address
|
||||
manager *Manager
|
||||
ownershipDB *OwnershipDB
|
||||
walletFeed *event.Feed
|
||||
chainID walletCommon.ChainID
|
||||
account common.Address
|
||||
manager *Manager
|
||||
ownershipDB *OwnershipDB
|
||||
walletFeed *event.Feed
|
||||
receivedCollectiblesCb OwnedCollectiblesCb
|
||||
|
||||
group *async.Group
|
||||
state atomic.Value
|
||||
}
|
||||
|
||||
func newPeriodicRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address) *periodicRefreshOwnedCollectiblesCommand {
|
||||
func newPeriodicRefreshOwnedCollectiblesCommand(
|
||||
manager *Manager,
|
||||
ownershipDB *OwnershipDB,
|
||||
walletFeed *event.Feed,
|
||||
chainID walletCommon.ChainID,
|
||||
account common.Address,
|
||||
receivedCollectiblesCb OwnedCollectiblesCb) *periodicRefreshOwnedCollectiblesCommand {
|
||||
ret := &periodicRefreshOwnedCollectiblesCommand{
|
||||
manager: manager,
|
||||
ownershipDB: ownershipDB,
|
||||
walletFeed: walletFeed,
|
||||
chainID: chainID,
|
||||
account: account,
|
||||
manager: manager,
|
||||
ownershipDB: ownershipDB,
|
||||
walletFeed: walletFeed,
|
||||
chainID: chainID,
|
||||
account: account,
|
||||
receivedCollectiblesCb: receivedCollectiblesCb,
|
||||
}
|
||||
ret.state.Store(OwnershipStateIdle)
|
||||
return ret
|
||||
|
@ -89,7 +105,9 @@ func (c *periodicRefreshOwnedCollectiblesCommand) Stop() {
|
|||
|
||||
func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx context.Context) error {
|
||||
c.group = async.NewGroup(ctx)
|
||||
command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, c.chainID, c.account)
|
||||
|
||||
receivedCollectiblesCh := make(chan OwnedCollectibles)
|
||||
command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, c.chainID, c.account, receivedCollectiblesCh)
|
||||
|
||||
c.state.Store(OwnershipStateUpdating)
|
||||
defer func() {
|
||||
|
@ -103,9 +121,14 @@ func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx cont
|
|||
c.group.Add(command.Command())
|
||||
|
||||
select {
|
||||
case ownedCollectibles := <-receivedCollectiblesCh:
|
||||
if c.receivedCollectiblesCb != nil {
|
||||
c.receivedCollectiblesCb(ownedCollectibles)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.group.WaitAsync():
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -114,24 +137,32 @@ func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx cont
|
|||
// Fetches owned collectibles for a ChainID+OwnerAddress combination in chunks
|
||||
// and updates the ownershipDB when all chunks are loaded
|
||||
type loadOwnedCollectiblesCommand struct {
|
||||
chainID walletCommon.ChainID
|
||||
account common.Address
|
||||
manager *Manager
|
||||
ownershipDB *OwnershipDB
|
||||
walletFeed *event.Feed
|
||||
chainID walletCommon.ChainID
|
||||
account common.Address
|
||||
manager *Manager
|
||||
ownershipDB *OwnershipDB
|
||||
walletFeed *event.Feed
|
||||
receivedCollectiblesCh chan<- OwnedCollectibles
|
||||
|
||||
// Not to be set by the caller
|
||||
partialOwnership []thirdparty.CollectibleUniqueID
|
||||
err error
|
||||
}
|
||||
|
||||
func newLoadOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address) *loadOwnedCollectiblesCommand {
|
||||
func newLoadOwnedCollectiblesCommand(
|
||||
manager *Manager,
|
||||
ownershipDB *OwnershipDB,
|
||||
walletFeed *event.Feed,
|
||||
chainID walletCommon.ChainID,
|
||||
account common.Address,
|
||||
receivedCollectiblesCh chan<- OwnedCollectibles) *loadOwnedCollectiblesCommand {
|
||||
return &loadOwnedCollectiblesCommand{
|
||||
manager: manager,
|
||||
ownershipDB: ownershipDB,
|
||||
walletFeed: walletFeed,
|
||||
chainID: chainID,
|
||||
account: account,
|
||||
manager: manager,
|
||||
ownershipDB: ownershipDB,
|
||||
walletFeed: walletFeed,
|
||||
chainID: chainID,
|
||||
account: account,
|
||||
receivedCollectiblesCh: receivedCollectiblesCh,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,11 +227,23 @@ func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) {
|
|||
// Normally, update the DB once we've finished fetching
|
||||
// If this is the first fetch, make partial updates to the client to get a better UX
|
||||
if initialFetch || finished {
|
||||
receivedIDs, err := c.ownershipDB.GetIDsNotInDB(c.chainID, c.account, c.partialOwnership)
|
||||
if err != nil {
|
||||
log.Error("failed GetIDsNotInDB in processOwnedIDs", "chain", c.chainID, "account", c.account, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership, start.Unix())
|
||||
if err != nil {
|
||||
log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err)
|
||||
c.err = err
|
||||
}
|
||||
|
||||
c.receivedCollectiblesCh <- OwnedCollectibles{
|
||||
chainID: c.chainID,
|
||||
account: c.account,
|
||||
ids: receivedIDs,
|
||||
}
|
||||
}
|
||||
|
||||
if finished || c.err != nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package collectibles
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -221,6 +222,7 @@ func (c *Controller) startPeriodicalOwnershipFetchForAccountAndChainID(address c
|
|||
c.walletFeed,
|
||||
chainID,
|
||||
address,
|
||||
c.notifyCommunityCollectiblesReceived,
|
||||
)
|
||||
|
||||
c.commands[address][chainID] = command
|
||||
|
@ -393,3 +395,53 @@ func (c *Controller) stopSettingsWatcher() {
|
|||
c.settingsWatcher = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) notifyCommunityCollectiblesReceived(ownedCollectibles OwnedCollectibles) {
|
||||
collectiblesData, err := c.manager.FetchAssetsByCollectibleUniqueID(ownedCollectibles.ids)
|
||||
if err != nil {
|
||||
log.Error("Error fetching collectibles data", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
communityCollectibles := make([]CommunityCollectibleHeader, 0, len(collectiblesData))
|
||||
for _, collectibleData := range collectiblesData {
|
||||
collectibleID := collectibleData.CollectibleData.ID
|
||||
communityID := collectibleData.CollectibleData.CommunityID
|
||||
|
||||
if communityID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
communityInfo, err := c.manager.FetchCollectibleCommunityInfo(communityID, collectibleID)
|
||||
if err != nil {
|
||||
log.Error("Error fetching community info", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
header := CommunityCollectibleHeader{
|
||||
ID: collectibleID,
|
||||
Name: collectibleData.CollectibleData.Name,
|
||||
CommunityHeader: communityInfoToHeader(*communityInfo),
|
||||
}
|
||||
|
||||
communityCollectibles = append(communityCollectibles, header)
|
||||
}
|
||||
|
||||
if len(communityCollectibles) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
encodedMessage, err := json.Marshal(communityCollectibles)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.walletFeed.Send(walletevent.Event{
|
||||
Type: EventCommunityCollectiblesReceived,
|
||||
ChainID: uint64(ownedCollectibles.chainID),
|
||||
Accounts: []common.Address{
|
||||
ownedCollectibles.account,
|
||||
},
|
||||
Message: string(encodedMessage),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -77,6 +77,42 @@ func updateAddressOwnershipTimestamp(creator sqlite.StatementCreator, ownerAddre
|
|||
return err
|
||||
}
|
||||
|
||||
// Returns the list of new IDs when comparing the given list of IDs with the ones in the DB.
|
||||
// Call before Update for the result to be useful.
|
||||
func (o *OwnershipDB) GetIDsNotInDB(
|
||||
chainID w_common.ChainID,
|
||||
ownerAddress common.Address,
|
||||
newIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.CollectibleUniqueID, error) {
|
||||
ret := make([]thirdparty.CollectibleUniqueID, 0, len(newIDs))
|
||||
|
||||
exists, err := o.db.Prepare(`SELECT EXISTS (
|
||||
SELECT 1 FROM collectibles_ownership_cache
|
||||
WHERE chain_id=? AND contract_address=? AND token_id=? AND owner_address=?
|
||||
)`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, id := range newIDs {
|
||||
row := exists.QueryRow(
|
||||
id.ContractID.ChainID,
|
||||
id.ContractID.Address,
|
||||
(*bigint.SQLBigIntBytes)(id.TokenID.Int),
|
||||
ownerAddress,
|
||||
)
|
||||
var exists bool
|
||||
err = row.Scan(&exists)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
ret = append(ret, id)
|
||||
}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (o *OwnershipDB) Update(chainID w_common.ChainID, ownerAddress common.Address, collectibles []thirdparty.CollectibleUniqueID, timestamp int64) (err error) {
|
||||
var (
|
||||
tx *sql.Tx
|
||||
|
|
|
@ -26,6 +26,7 @@ const (
|
|||
EventCollectiblesOwnershipUpdatePartial walletevent.EventType = "wallet-collectibles-ownership-update-partial"
|
||||
EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished"
|
||||
EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error"
|
||||
EventCommunityCollectiblesReceived walletevent.EventType = "wallet-collectibles-community-collectibles-received"
|
||||
|
||||
EventOwnedCollectiblesFilteringDone walletevent.EventType = "wallet-owned-collectibles-filtering-done"
|
||||
EventGetCollectiblesDetailsDone walletevent.EventType = "wallet-get-collectibles-details-done"
|
||||
|
@ -258,12 +259,8 @@ func (s *Service) fullCollectiblesDataToHeaders(data []thirdparty.FullCollectibl
|
|||
return nil, err
|
||||
}
|
||||
|
||||
header.CommunityHeader = &CommunityHeader{
|
||||
CommunityID: communityInfo.CommunityID,
|
||||
CommunityName: communityInfo.CommunityName,
|
||||
CommunityColor: communityInfo.CommunityColor,
|
||||
PrivilegesLevel: communityInfo.PrivilegesLevel,
|
||||
}
|
||||
communityHeader := communityInfoToHeader(*communityInfo)
|
||||
header.CommunityHeader = &communityHeader
|
||||
}
|
||||
|
||||
res = append(res, header)
|
||||
|
|
|
@ -42,6 +42,12 @@ type CommunityHeader struct {
|
|||
PrivilegesLevel token.PrivilegesLevel `json:"privileges_level"`
|
||||
}
|
||||
|
||||
type CommunityCollectibleHeader struct {
|
||||
ID thirdparty.CollectibleUniqueID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
CommunityHeader CommunityHeader `json:"community_header"`
|
||||
}
|
||||
|
||||
func fullCollectibleDataToHeader(c thirdparty.FullCollectibleData) CollectibleHeader {
|
||||
ret := CollectibleHeader{
|
||||
ID: c.CollectibleData.ID,
|
||||
|
@ -77,3 +83,12 @@ func fullCollectibleDataToDetails(c thirdparty.FullCollectibleData) CollectibleD
|
|||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func communityInfoToHeader(c thirdparty.CollectiblesCommunityInfo) CommunityHeader {
|
||||
return CommunityHeader{
|
||||
CommunityID: c.CommunityID,
|
||||
CommunityName: c.CommunityName,
|
||||
CommunityColor: c.CommunityColor,
|
||||
PrivilegesLevel: c.PrivilegesLevel,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue