From 358063820ffecb3e37afd257d51907bef603a5bf Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Mon, 22 Jan 2024 18:56:18 +0100 Subject: [PATCH] wallet-fetcher: adding a better way to handle the rate limiting Signed-off-by: Alexis Pentori --- wallet-fetcher/metadata.yaml | 2 +- .../schemas/bitcoin_token.json | 6 ---- .../schemas/ethereum_token.json | 6 ---- .../source_wallet_fetcher/stream.py | 35 ++++++++++++------- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/wallet-fetcher/metadata.yaml b/wallet-fetcher/metadata.yaml index 78c2306..0b9d8cb 100644 --- a/wallet-fetcher/metadata.yaml +++ b/wallet-fetcher/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 1e55cfe0-f591-4281-9a20-18d89d45f685 - dockerImageTag: 0.3.0 + dockerImageTag: 0.5.0 dockerRepository: harbor.status.im/status-im/airbyte/wallet-fetcher githubIssueLabel: source-wallet-fetcher icon: icon.svg diff --git a/wallet-fetcher/source_wallet_fetcher/schemas/bitcoin_token.json b/wallet-fetcher/source_wallet_fetcher/schemas/bitcoin_token.json index d592d9a..bd4b42e 100644 --- a/wallet-fetcher/source_wallet_fetcher/schemas/bitcoin_token.json +++ b/wallet-fetcher/source_wallet_fetcher/schemas/bitcoin_token.json @@ -26,12 +26,6 @@ "string" ] }, - "address": { - "type": [ - "null", - "string" - ] - }, "chain": { "type": [ "null", diff --git a/wallet-fetcher/source_wallet_fetcher/schemas/ethereum_token.json b/wallet-fetcher/source_wallet_fetcher/schemas/ethereum_token.json index d592d9a..bd4b42e 100644 --- a/wallet-fetcher/source_wallet_fetcher/schemas/ethereum_token.json +++ b/wallet-fetcher/source_wallet_fetcher/schemas/ethereum_token.json @@ -26,12 +26,6 @@ "string" ] }, - "address": { - "type": [ - "null", - "string" - ] - }, "chain": { "type": [ "null", diff --git a/wallet-fetcher/source_wallet_fetcher/stream.py b/wallet-fetcher/source_wallet_fetcher/stream.py index e0d42c2..6c785f6 100644 --- a/wallet-fetcher/source_wallet_fetcher/stream.py +++ b/wallet-fetcher/source_wallet_fetcher/stream.py @@ -3,7 +3,7 @@ from airbyte_cdk.sources.streams.http import HttpStream import logging import requests import json - +import time logger = logging.getLogger("airbyte") @@ -28,6 +28,17 @@ class BlockchainStream(HttpStream): def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None + def backoff_time(self, response: requests.Response) -> Optional[float]: + """This method is called if we run into the rate limit. + Slack puts the retry time in the `Retry-After` response header so we + we return that value. If the response is anything other than a 429 (e.g: 5XX) + fall back on default retry behavior. + """ + if "Retry-After" in response.headers: + return int(response.headers["Retry-After"]) + else: + self.logger.info("Retry-after header not found. Using default backoff value") + return 5 @@ -52,7 +63,6 @@ class BitcoinToken(BlockchainStream): "name":"BTC", "symbol":"BTC", "description": "Bitcoin", - "address":"", "chain": "bitcoin", "balance": bitcoin_data['final_balance'], "decimal":8, @@ -71,26 +81,25 @@ class EthereumToken(BlockchainStream): stream_slice: Mapping[str, Any] = None, **kwargs ) -> Iterable[Mapping]: - logger.info("Getting ETH balance information") + logger.info("Getting ETH balance information %s", stream_slice['name']) eth_data=response.json()['ETH'] yield { "wallet_name": stream_slice['name'], "name":"ETH", "symbol":"ETH", "description": "Native Ethereum token", - "address":"", "chain": "Ethereum", "balance":eth_data['rawBalance'], "decimal":18, "tags": stream_slice['tags'] } - logging.info("Fetching Tokens balance information") - tokens_data=response.json()['tokens'] - for t in tokens_data: - try: - yield extract_token(stream_slice['name'], t) - except Exception as e: - logger.error('Dropping token not valid %s' % t ) - - + if 'tokens' in response.json(): + tokens_data=response.json()['tokens'] + for t in tokens_data: + try: + yield extract_token(stream_slice['name'], t) + except Exception as e: + logger.debug('Dropping token not valid %s' % t ) + # Delaying calls - Not great but that works + time.sleep(2)