wallet-fetcher: adding a better way to handle the rate limiting

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2024-01-22 18:56:18 +01:00
parent f367ff8728
commit 358063820f
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
4 changed files with 23 additions and 26 deletions

View File

@ -10,7 +10,7 @@ data:
connectorSubtype: api connectorSubtype: api
connectorType: source connectorType: source
definitionId: 1e55cfe0-f591-4281-9a20-18d89d45f685 definitionId: 1e55cfe0-f591-4281-9a20-18d89d45f685
dockerImageTag: 0.3.0 dockerImageTag: 0.5.0
dockerRepository: harbor.status.im/status-im/airbyte/wallet-fetcher dockerRepository: harbor.status.im/status-im/airbyte/wallet-fetcher
githubIssueLabel: source-wallet-fetcher githubIssueLabel: source-wallet-fetcher
icon: icon.svg icon: icon.svg

View File

@ -26,12 +26,6 @@
"string" "string"
] ]
}, },
"address": {
"type": [
"null",
"string"
]
},
"chain": { "chain": {
"type": [ "type": [
"null", "null",

View File

@ -26,12 +26,6 @@
"string" "string"
] ]
}, },
"address": {
"type": [
"null",
"string"
]
},
"chain": { "chain": {
"type": [ "type": [
"null", "null",

View File

@ -3,7 +3,7 @@ from airbyte_cdk.sources.streams.http import HttpStream
import logging import logging
import requests import requests
import json import json
import time
logger = logging.getLogger("airbyte") logger = logging.getLogger("airbyte")
@ -28,6 +28,17 @@ class BlockchainStream(HttpStream):
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None return None
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""This method is called if we run into the rate limit.
Slack puts the retry time in the `Retry-After` response header so we
we return that value. If the response is anything other than a 429 (e.g: 5XX)
fall back on default retry behavior.
"""
if "Retry-After" in response.headers:
return int(response.headers["Retry-After"])
else:
self.logger.info("Retry-after header not found. Using default backoff value")
return 5
@ -52,7 +63,6 @@ class BitcoinToken(BlockchainStream):
"name":"BTC", "name":"BTC",
"symbol":"BTC", "symbol":"BTC",
"description": "Bitcoin", "description": "Bitcoin",
"address":"",
"chain": "bitcoin", "chain": "bitcoin",
"balance": bitcoin_data['final_balance'], "balance": bitcoin_data['final_balance'],
"decimal":8, "decimal":8,
@ -71,26 +81,25 @@ class EthereumToken(BlockchainStream):
stream_slice: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
**kwargs **kwargs
) -> Iterable[Mapping]: ) -> Iterable[Mapping]:
logger.info("Getting ETH balance information") logger.info("Getting ETH balance information %s", stream_slice['name'])
eth_data=response.json()['ETH'] eth_data=response.json()['ETH']
yield { yield {
"wallet_name": stream_slice['name'], "wallet_name": stream_slice['name'],
"name":"ETH", "name":"ETH",
"symbol":"ETH", "symbol":"ETH",
"description": "Native Ethereum token", "description": "Native Ethereum token",
"address":"",
"chain": "Ethereum", "chain": "Ethereum",
"balance":eth_data['rawBalance'], "balance":eth_data['rawBalance'],
"decimal":18, "decimal":18,
"tags": stream_slice['tags'] "tags": stream_slice['tags']
} }
logging.info("Fetching Tokens balance information") if 'tokens' in response.json():
tokens_data=response.json()['tokens'] tokens_data=response.json()['tokens']
for t in tokens_data: for t in tokens_data:
try: try:
yield extract_token(stream_slice['name'], t) yield extract_token(stream_slice['name'], t)
except Exception as e: except Exception as e:
logger.error('Dropping token not valid %s' % t ) logger.debug('Dropping token not valid %s' % t )
# Delaying calls - Not great but that works
time.sleep(2)