fix: limit number of concurrent requests

This commit is contained in:
Richard Ramos 2022-02-13 18:51:43 -04:00
parent a23ee30565
commit 559e6dddfa
10 changed files with 280 additions and 18 deletions

1
go.mod
View File

@ -65,6 +65,7 @@ require (
github.com/wealdtech/go-ens/v3 v3.5.0 github.com/wealdtech/go-ens/v3 v3.5.0
github.com/wealdtech/go-multicodec v1.4.0 github.com/wealdtech/go-multicodec v1.4.0
github.com/xeipuuv/gojsonschema v1.2.0 github.com/xeipuuv/gojsonschema v1.2.0
github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15
go.uber.org/zap v1.19.0 go.uber.org/zap v1.19.0
golang.org/x/crypto v0.0.0-20211202192323-5770296d904e golang.org/x/crypto v0.0.0-20211202192323-5770296d904e
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb

2
go.sum
View File

@ -1308,6 +1308,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15 h1:0UwX38TojH86Dl0n4OwPMNYy8hNs2NhtzwY4RlvANlA=
github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15/go.mod h1:DUzu/BC4TkgUfXP8J1P6Md73Djt+0l0CHq001Pt4weA=
gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE= gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=

View File

@ -7,12 +7,12 @@ import (
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multibase" "github.com/multiformats/go-multibase"
"github.com/wealdtech/go-multicodec" "github.com/wealdtech/go-multicodec"
"github.com/zenthangplus/goccm"
"olympos.io/encoding/edn" "olympos.io/encoding/edn"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
@ -31,6 +31,7 @@ import (
) )
const ipfsGateway = ".ipfs.cf-ipfs.com" const ipfsGateway = ".ipfs.cf-ipfs.com"
const maxConcurrentRequests = 3
// ConnectionType constants // ConnectionType constants
type stickerStatus int type stickerStatus int
@ -104,7 +105,6 @@ func NewAPI(ctx context.Context, appDB *sql.DB, rpcClient *rpc.Client, accountsM
func (api *API) Market(chainID uint64) ([]StickerPack, error) { func (api *API) Market(chainID uint64) ([]StickerPack, error) {
// TODO: eventually this should be changed to include pagination // TODO: eventually this should be changed to include pagination
accounts, err := api.accountsDB.GetAccounts() accounts, err := api.accountsDB.GetAccounts()
if err != nil { if err != nil {
return nil, err return nil, err
@ -163,13 +163,17 @@ func (api *API) execTokenPackID(chainID uint64, tokenIDs []*big.Int, resultChan
return return
} }
if len(tokenIDs) == 0 {
return
}
callOpts := &bind.CallOpts{Context: api.ctx, Pending: false} callOpts := &bind.CallOpts{Context: api.ctx, Pending: false}
wg := sync.WaitGroup{} c := goccm.New(maxConcurrentRequests)
wg.Add(len(tokenIDs))
for _, tokenID := range tokenIDs { for _, tokenID := range tokenIDs {
c.Wait()
go func(tokenID *big.Int) { go func(tokenID *big.Int) {
defer wg.Done() defer c.Done()
packID, err := stickerPack.TokenPackId(callOpts, tokenID) packID, err := stickerPack.TokenPackId(callOpts, tokenID)
if err != nil { if err != nil {
errChan <- err errChan <- err
@ -178,7 +182,7 @@ func (api *API) execTokenPackID(chainID uint64, tokenIDs []*big.Int, resultChan
resultChan <- packID resultChan <- packID
}(tokenID) }(tokenID)
} }
wg.Wait() c.WaitAllDone()
} }
func (api *API) getTokenPackIDs(chainID uint64, tokenIDs []*big.Int) ([]*big.Int, error) { func (api *API) getTokenPackIDs(chainID uint64, tokenIDs []*big.Int) ([]*big.Int, error) {
@ -292,11 +296,15 @@ func (api *API) fetchStickerPacks(chainID uint64, resultChan chan<- *StickerPack
return return
} }
wg := sync.WaitGroup{} if numPacks.Uint64() == 0 {
wg.Add(int(numPacks.Int64())) return
}
c := goccm.New(maxConcurrentRequests)
for i := uint64(0); i < numPacks.Uint64(); i++ { for i := uint64(0); i < numPacks.Uint64(); i++ {
c.Wait()
go func(i uint64) { go func(i uint64) {
defer wg.Done() defer c.Done()
packID := new(big.Int).SetUint64(i) packID := new(big.Int).SetUint64(i)
@ -319,7 +327,7 @@ func (api *API) fetchStickerPacks(chainID uint64, resultChan chan<- *StickerPack
resultChan <- stickerPack resultChan <- stickerPack
}(i) }(i)
} }
wg.Wait() c.WaitAllDone()
} }
func (api *API) fetchPackData(stickerType *stickers.StickerType, packID *big.Int, translateHashes bool) (*StickerPack, error) { func (api *API) fetchPackData(stickerType *stickers.StickerType, packID *big.Int, translateHashes bool) (*StickerPack, error) {
@ -469,11 +477,15 @@ func (api *API) getAccountsPurchasedPack(chainID uint64, accs []accounts.Account
defer close(errChan) defer close(errChan)
defer close(resultChan) defer close(resultChan)
wg := sync.WaitGroup{} if len(accs) == 0 {
wg.Add(len(accs)) return
}
c := goccm.New(maxConcurrentRequests)
for _, account := range accs { for _, account := range accs {
c.Wait()
go func(acc accounts.Account) { go func(acc accounts.Account) {
defer wg.Done() defer c.Done()
packs, err := api.getPurchasedPackIDs(chainID, acc.Address) packs, err := api.getPurchasedPackIDs(chainID, acc.Address)
if err != nil { if err != nil {
errChan <- err errChan <- err
@ -485,7 +497,7 @@ func (api *API) getAccountsPurchasedPack(chainID uint64, accs []accounts.Account
} }
}(account) }(account)
} }
wg.Wait() c.WaitAllDone()
} }
func (api *API) execTokenOwnerOfIndex(chainID uint64, account types.Address, balance *big.Int, resultChan chan<- *big.Int, errChan chan<- error, doneChan chan<- struct{}) { func (api *API) execTokenOwnerOfIndex(chainID uint64, account types.Address, balance *big.Int, resultChan chan<- *big.Int, errChan chan<- error, doneChan chan<- struct{}) {
@ -499,13 +511,17 @@ func (api *API) execTokenOwnerOfIndex(chainID uint64, account types.Address, bal
return return
} }
if balance.Int64() == 0 {
return
}
callOpts := &bind.CallOpts{Context: api.ctx, Pending: false} callOpts := &bind.CallOpts{Context: api.ctx, Pending: false}
wg := sync.WaitGroup{} c := goccm.New(maxConcurrentRequests)
wg.Add(int(balance.Int64()))
for i := uint64(0); i < balance.Uint64(); i++ { for i := uint64(0); i < balance.Uint64(); i++ {
c.Wait()
go func(i uint64) { go func(i uint64) {
defer wg.Done() defer c.Done()
tokenID, err := stickerPack.TokenOfOwnerByIndex(callOpts, common.Address(account), new(big.Int).SetUint64(i)) tokenID, err := stickerPack.TokenOfOwnerByIndex(callOpts, common.Address(account), new(big.Int).SetUint64(i))
if err != nil { if err != nil {
errChan <- err errChan <- err
@ -515,7 +531,7 @@ func (api *API) execTokenOwnerOfIndex(chainID uint64, account types.Address, bal
resultChan <- tokenID resultChan <- tokenID
}(i) }(i)
} }
wg.Wait() c.WaitAllDone()
} }
func (api *API) getTokenOwnerOfIndex(chainID uint64, account types.Address, balance *big.Int) ([]*big.Int, error) { func (api *API) getTokenOwnerOfIndex(chainID uint64, account types.Address, balance *big.Int) ([]*big.Int, error) {

1
vendor/github.com/zenthangplus/goccm/.gitignore generated vendored Normal file
View File

@ -0,0 +1 @@
.idea

16
vendor/github.com/zenthangplus/goccm/.travis.yml generated vendored Normal file
View File

@ -0,0 +1,16 @@
language: go
go:
- 1.x
- 1.10.x
- 1.11.x
- 1.12.x
before_install:
- go get -t -v ./...
script:
- go test -coverprofile=coverage.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash)

21
vendor/github.com/zenthangplus/goccm/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2019 ZENTHANG
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

76
vendor/github.com/zenthangplus/goccm/README.md generated vendored Normal file
View File

@ -0,0 +1,76 @@
# Golang Concurrency Manager [![Build Status](https://travis-ci.com/zenthangplus/goccm.svg?branch=master)](https://travis-ci.com/zenthangplus/goccm)
Golang Concurrency Manager package limits the number of goroutines that are allowed to run concurrently.
### Installation
Run the following command to install this package:
```
$ go get -u github.com/zenthangplus/goccm
```
### Example
```go
package main
import (
"fmt"
"github.com/zenthangplus/goccm"
"time"
)
func main() {
// Limit 3 goroutines to run concurrently.
c := goccm.New(3)
for i := 1; i <= 10; i++ {
// This function has to call before any goroutine
c.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)
// This function has to when a goroutine has finished
// Or you can use `defer c.Done()` at the top of goroutine.
c.Done()
}(i)
}
// This function has to call to ensure all goroutines have finished
// after close the main program.
c.WaitAllDone()
}
```
### List of supported functions
```go
package main
import "github.com/zenthangplus/goccm"
func main() {
// Create the concurrency manager
// The first argument is the maximum number of goroutines to run concurrently.
c := goccm.New(10)
// Wait until a slot is available for the new goroutine.
c.Wait()
// Mark a goroutine as finished
c.Done()
// Wait for all goroutines are done
c.WaitAllDone()
// Close the manager manually
c.Close()
// Returns the number of goroutines which are running
c.RunningCount()
}
```

3
vendor/github.com/zenthangplus/goccm/go.mod generated vendored Normal file
View File

@ -0,0 +1,3 @@
module github.com/zenthangplus/goccm
go 1.14

124
vendor/github.com/zenthangplus/goccm/goccm.go generated vendored Normal file
View File

@ -0,0 +1,124 @@
package goccm
import "sync/atomic"
type (
// ConcurrencyManager Interface
ConcurrencyManager interface {
// Wait until a slot is available for the new goroutine.
Wait()
// Mark a goroutine as finished
Done()
// Close the manager manually
Close()
// Wait for all goroutines are done
WaitAllDone()
// Returns the number of goroutines which are running
RunningCount() int32
}
concurrencyManager struct {
// The number of goroutines that are allowed to run concurrently
max int
// The manager channel to coordinate the number of concurrent goroutines.
managerCh chan interface{}
// The done channel indicates when a single goroutine has finished its job.
doneCh chan bool
// This channel indicates when all goroutines have finished their job.
allDoneCh chan bool
// The close flag allows we know when we can close the manager
closed bool
// The running count allows we know the number of goroutines are running
runningCount int32
}
)
// New concurrencyManager
func New(maxGoRoutines int) *concurrencyManager {
// Initiate the manager object
c := concurrencyManager{
max: maxGoRoutines,
managerCh: make(chan interface{}, maxGoRoutines),
doneCh: make(chan bool),
allDoneCh: make(chan bool),
}
// Fill the manager channel by placeholder values
for i := 0; i < c.max; i++ {
c.managerCh <- nil
}
// Start the controller to collect all the jobs
go c.controller()
return &c
}
// Create the controller to collect all the jobs.
// When a goroutine is finished, we can release a slot for another goroutine.
func (c *concurrencyManager) controller() {
for {
// This will block until a goroutine is finished
<-c.doneCh
// Say that another goroutine can now start
c.managerCh <- nil
// When the closed flag is set,
// we need to close the manager if it doesn't have any running goroutine
if c.closed == true && c.runningCount == 0 {
break
}
}
// Say that all goroutines are finished, we can close the manager
c.allDoneCh <- true
}
// Wait until a slot is available for the new goroutine.
// A goroutine have to start after this function.
func (c *concurrencyManager) Wait() {
// Try to receive from the manager channel. When we have something,
// it means a slot is available and we can start a new goroutine.
// Otherwise, it will block until a slot is available.
<-c.managerCh
// Increase the running count to help we know how many goroutines are running.
atomic.AddInt32(&c.runningCount, 1)
}
// Mark a goroutine as finished
func (c *concurrencyManager) Done() {
// Decrease the number of running count
atomic.AddInt32(&c.runningCount, -1)
c.doneCh <- true
}
// Close the manager manually
func (c *concurrencyManager) Close() {
c.closed = true
}
// Wait for all goroutines are done
func (c *concurrencyManager) WaitAllDone() {
// Close the manager automatic
c.Close()
// This will block until allDoneCh was marked
<-c.allDoneCh
}
// Returns the number of goroutines which are running
func (c *concurrencyManager) RunningCount() int32 {
return c.runningCount
}

2
vendor/modules.txt vendored
View File

@ -556,6 +556,8 @@ github.com/xeipuuv/gojsonpointer
github.com/xeipuuv/gojsonreference github.com/xeipuuv/gojsonreference
# github.com/xeipuuv/gojsonschema v1.2.0 # github.com/xeipuuv/gojsonschema v1.2.0
github.com/xeipuuv/gojsonschema github.com/xeipuuv/gojsonschema
# github.com/zenthangplus/goccm v0.0.0-20211005163543-2f2e522aca15
github.com/zenthangplus/goccm
# go.opencensus.io v0.23.0 # go.opencensus.io v0.23.0
go.opencensus.io/internal/tagencoding go.opencensus.io/internal/tagencoding
go.opencensus.io/metric/metricdata go.opencensus.io/metric/metricdata