fix: added global lock for Opensea requests. Increased number of retries.
This commit is contained in:
parent
eb4257a904
commit
48e16317a7
|
@ -2,7 +2,6 @@ package opensea
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
@ -22,19 +21,28 @@ import (
|
|||
const AssetLimit = 200
|
||||
const CollectionLimit = 300
|
||||
|
||||
const RequestRetryMaxCount = 1
|
||||
const RequestWaitTime = 300 * time.Millisecond
|
||||
|
||||
var OpenseaClientInstances = make(map[uint64]*Client)
|
||||
|
||||
var BaseURLs = map[uint64]string{
|
||||
1: "https://api.opensea.io/api/v1",
|
||||
4: "https://rinkeby-api.opensea.io/api/v1",
|
||||
5: "https://testnets-api.opensea.io/api/v1",
|
||||
}
|
||||
const RequestTimeout = 5 * time.Second
|
||||
const GetRequestRetryMaxCount = 15
|
||||
const GetRequestWaitTime = 300 * time.Millisecond
|
||||
|
||||
const ChainIDRequiringAPIKey = 1
|
||||
|
||||
func getbaseURL(chainID uint64) (string, error) {
|
||||
switch chainID {
|
||||
case 1:
|
||||
return "https://api.opensea.io/api/v1", nil
|
||||
case 4:
|
||||
return "https://rinkeby-api.opensea.io/api/v1", nil
|
||||
case 5:
|
||||
return "https://testnets-api.opensea.io/api/v1", nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("chainID not supported")
|
||||
}
|
||||
|
||||
var OpenseaClientInstances = make(map[uint64]*Client)
|
||||
var OpenseaHTTPClient *HTTPClient = nil
|
||||
|
||||
type TraitValue string
|
||||
|
||||
func (st *TraitValue) UnmarshalJSON(b []byte) error {
|
||||
|
@ -125,18 +133,85 @@ type OwnedCollection struct {
|
|||
OwnedAssetCount *bigint.BigInt `json:"owned_asset_count"`
|
||||
}
|
||||
|
||||
type HTTPClient struct {
|
||||
client *http.Client
|
||||
getRequestLock sync.RWMutex
|
||||
}
|
||||
|
||||
func newHTTPClient() *HTTPClient {
|
||||
return &HTTPClient{
|
||||
client: &http.Client{
|
||||
Timeout: RequestTimeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (o *HTTPClient) doGetRequest(url string, apiKey string) ([]byte, error) {
|
||||
// Ensure only one thread makes a request at a time
|
||||
o.getRequestLock.Lock()
|
||||
defer o.getRequestLock.Unlock()
|
||||
|
||||
retryCount := 0
|
||||
statusCode := http.StatusOK
|
||||
|
||||
for {
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:96.0) Gecko/20100101 Firefox/96.0")
|
||||
if len(apiKey) > 0 {
|
||||
req.Header.Set("X-API-KEY", apiKey)
|
||||
}
|
||||
|
||||
resp, err := o.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
log.Error("failed to close opensea request body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
statusCode = resp.StatusCode
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
return body, err
|
||||
case http.StatusTooManyRequests:
|
||||
if retryCount < GetRequestRetryMaxCount {
|
||||
// sleep and retry
|
||||
time.Sleep(GetRequestWaitTime)
|
||||
retryCount++
|
||||
continue
|
||||
}
|
||||
// break and error
|
||||
default:
|
||||
// break and error
|
||||
}
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("unsuccessful request: %d %s", statusCode, http.StatusText(statusCode))
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
client *http.Client
|
||||
client *HTTPClient
|
||||
url string
|
||||
apiKey string
|
||||
IsConnected bool
|
||||
LastCheckedAt int64
|
||||
IsConnectedLock sync.RWMutex
|
||||
requestLock sync.RWMutex
|
||||
}
|
||||
|
||||
// new opensea client.
|
||||
func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) {
|
||||
if OpenseaHTTPClient == nil {
|
||||
OpenseaHTTPClient = newHTTPClient()
|
||||
}
|
||||
|
||||
var tmpAPIKey string = ""
|
||||
if chainID == ChainIDRequiringAPIKey {
|
||||
tmpAPIKey = apiKey
|
||||
|
@ -147,22 +222,20 @@ func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) {
|
|||
}
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
if url, ok := BaseURLs[chainID]; ok {
|
||||
openseaClient := &Client{
|
||||
client: client,
|
||||
url: url,
|
||||
apiKey: tmpAPIKey,
|
||||
IsConnected: true,
|
||||
LastCheckedAt: time.Now().Unix(),
|
||||
}
|
||||
OpenseaClientInstances[chainID] = openseaClient
|
||||
return openseaClient, nil
|
||||
baseURL, err := getbaseURL(chainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, errors.New("ChainID not supported")
|
||||
openseaClient := &Client{
|
||||
client: OpenseaHTTPClient,
|
||||
url: baseURL,
|
||||
apiKey: tmpAPIKey,
|
||||
IsConnected: true,
|
||||
LastCheckedAt: time.Now().Unix(),
|
||||
}
|
||||
OpenseaClientInstances[chainID] = openseaClient
|
||||
return openseaClient, nil
|
||||
}
|
||||
|
||||
func (o *Client) setConnected(value bool) {
|
||||
|
@ -177,7 +250,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec
|
|||
var collections []OwnedCollection
|
||||
for {
|
||||
url := fmt.Sprintf("%s/collections?asset_owner=%s&offset=%d&limit=%d", o.url, owner, offset, CollectionLimit)
|
||||
body, err := o.doOpenseaRequest(url)
|
||||
body, err := o.client.doGetRequest(url, o.apiKey)
|
||||
if err != nil {
|
||||
o.setConnected(false)
|
||||
return nil, err
|
||||
|
@ -273,7 +346,7 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer
|
|||
for {
|
||||
url := o.url + "/assets?" + queryParams.Encode()
|
||||
|
||||
body, err := o.doOpenseaRequest(url)
|
||||
body, err := o.client.doGetRequest(url, o.apiKey)
|
||||
if err != nil {
|
||||
o.setConnected(false)
|
||||
return nil, err
|
||||
|
@ -314,54 +387,3 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer
|
|||
o.setConnected(true)
|
||||
return assets, nil
|
||||
}
|
||||
|
||||
func (o *Client) doOpenseaRequest(url string) ([]byte, error) {
|
||||
// Ensure only one thread makes a request at a time
|
||||
o.requestLock.Lock()
|
||||
defer o.requestLock.Unlock()
|
||||
|
||||
retryCount := 0
|
||||
statusCode := http.StatusOK
|
||||
|
||||
for {
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:96.0) Gecko/20100101 Firefox/96.0")
|
||||
if len(o.apiKey) > 0 {
|
||||
req.Header.Set("X-API-KEY", o.apiKey)
|
||||
}
|
||||
|
||||
resp, err := o.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
log.Error("failed to close opensea request body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
statusCode = resp.StatusCode
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
return body, err
|
||||
case http.StatusTooManyRequests:
|
||||
if retryCount < RequestRetryMaxCount {
|
||||
// sleep and retry
|
||||
time.Sleep(RequestWaitTime)
|
||||
retryCount++
|
||||
continue
|
||||
}
|
||||
// break and error
|
||||
default:
|
||||
// break and error
|
||||
}
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("unsuccessful request: %d %s", statusCode, http.StatusText(statusCode))
|
||||
}
|
||||
|
|
|
@ -39,8 +39,11 @@ func TestFetchAllCollectionsByOwner(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
opensea := &Client{
|
||||
client := &HTTPClient{
|
||||
client: srv.Client(),
|
||||
}
|
||||
opensea := &Client{
|
||||
client: client,
|
||||
url: srv.URL,
|
||||
}
|
||||
res, err := opensea.FetchAllCollectionsByOwner(common.Address{1})
|
||||
|
@ -58,8 +61,11 @@ func TestFetchAllCollectionsByOwnerWithInValidJson(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
opensea := &Client{
|
||||
client := &HTTPClient{
|
||||
client: srv.Client(),
|
||||
}
|
||||
opensea := &Client{
|
||||
client: client,
|
||||
url: srv.URL,
|
||||
}
|
||||
res, err := opensea.FetchAllCollectionsByOwner(common.Address{1})
|
||||
|
@ -92,8 +98,11 @@ func TestFetchAllAssetsByOwnerAndCollection(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
opensea := &Client{
|
||||
client := &HTTPClient{
|
||||
client: srv.Client(),
|
||||
}
|
||||
opensea := &Client{
|
||||
client: client,
|
||||
url: srv.URL,
|
||||
}
|
||||
res, err := opensea.FetchAllAssetsByOwnerAndCollection(common.Address{1}, "rocky", "", 200)
|
||||
|
@ -111,8 +120,11 @@ func TestFetchAllAssetsByOwnerAndCollectionInvalidJson(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
opensea := &Client{
|
||||
client := &HTTPClient{
|
||||
client: srv.Client(),
|
||||
}
|
||||
opensea := &Client{
|
||||
client: client,
|
||||
url: srv.URL,
|
||||
}
|
||||
res, err := opensea.FetchAllAssetsByOwnerAndCollection(common.Address{1}, "rocky", "", 200)
|
||||
|
|
Loading…
Reference in New Issue