From b547fe6489b3b960bb855a6df03a329e6caf30f3 Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Thu, 7 Dec 2023 10:02:21 +0100 Subject: [PATCH] wallet-fetcher: stream refactoring Signed-off-by: Alexis Pentori --- .../source_wallet_fetcher/source.py | 92 +------------------ .../source_wallet_fetcher/stream.py | 91 ++++++++++++++++++ 2 files changed, 92 insertions(+), 91 deletions(-) create mode 100644 wallet-fetcher/source_wallet_fetcher/stream.py diff --git a/wallet-fetcher/source_wallet_fetcher/source.py b/wallet-fetcher/source_wallet_fetcher/source.py index 0d6ad2b..7ef358c 100644 --- a/wallet-fetcher/source_wallet_fetcher/source.py +++ b/wallet-fetcher/source_wallet_fetcher/source.py @@ -6,6 +6,7 @@ #from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple from .utils import extract_token +from .stream import BitcoinToken, EthereumToken import logging import requests import json @@ -15,97 +16,6 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator logger = logging.getLogger("airbyte") - - -class BitcoinToken(HttpStream): - url_base = "https://blockchain.info/rawaddr/" - - primary_key = None - - def __init__(self, wallets: List[str], **kwargs): - super().__init__(**kwargs) - self.wallets = wallets - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - for wallet in self.wallets: - yield { - "address": wallet['address'], - "name": wallet['name'] - } - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return f"{stream_slice['address']}" - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def parse_response( - self, - response: requests.Response, - stream_slice: Mapping[str, Any] = None, - **kwargs - ) -> Iterable[Mapping]: - logger.info("Getting Bitcoin Balance information") - bitcoin_data = response.json() - yield { - "wallet_name": stream_slice['name'], - "name":"BTC", - "symbol":"BTC", - "description": "Bitcoin", - "address":"", - "chain": "bitcoin", - "balance": bitcoin_data['final_balance'], - "decimal":8 - } - -class EthereumToken(HttpStream): - - url_base = "https://api.ethplorer.io/getAddressInfo/" - - primary_key = None - - def __init__(self, wallets: List[str], **kwargs): - super().__init__(**kwargs) - self.wallets = wallets - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - for wallet in self.wallets: - yield { - "address": wallet['address'], - "name": wallet['name'] - } - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return f"{stream_slice['address']}?apiKey=freekey" - - def parse_response(self, - response: requests.Response, - stream_slice: Mapping[str, Any] = None, - **kwargs - ) -> Iterable[Mapping]: - logger.info("Getting ETH balance information") - eth_data=response.json()['ETH'] - yield { - "wallet_name": stream_slice['name'], - "name":"ETH", - "symbol":"ETH", - "description": "Native Ethereum token", - "address":"", - "chain": "Ethereum", - "balance":eth_data['rawBalance'], - "decimal":18 - } - logging.info("Fetching Tokens balance information") - tokens_data=response.json()['tokens'] - for t in tokens_data: - try: - yield extract_token(stream_slice['name'], t) - except Exception as e: - logger.error('Dropping token not valid %s' % t ) - # Source class SourceWalletFetcher(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: diff --git a/wallet-fetcher/source_wallet_fetcher/stream.py b/wallet-fetcher/source_wallet_fetcher/stream.py new file mode 100644 index 0000000..522573e --- /dev/null +++ b/wallet-fetcher/source_wallet_fetcher/stream.py @@ -0,0 +1,91 @@ +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from airbyte_cdk.sources.streams.http import HttpStream +import logging +import requests +import json + + +logger = logging.getLogger("airbyte") + + +class BlockchainStream(HttpStream): + + primary_key = None + + + def __init__(self, wallets: List['str'], **kwargs): + super().__init__(**kwargs) + self.wallets = wallets + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + for wallet in self.wallets: + yield { + "address": wallet['address'], + "name": wallet['name'] + } + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + + + +class BitcoinToken(BlockchainStream): + + url_base = "https://blockchain.info/rawaddr/" + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"{stream_slice['address']}" + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.info("Getting Bitcoin Balance information") + bitcoin_data = response.json() + yield { + "wallet_name": stream_slice['name'], + "name":"BTC", + "symbol":"BTC", + "description": "Bitcoin", + "address":"", + "chain": "bitcoin", + "balance": bitcoin_data['final_balance'], + "decimal":8 + } + +class EthereumToken(BlockchainStream): + + url_base = "https://api.ethplorer.io/getAddressInfo/" + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"{stream_slice['address']}?apiKey=freekey" + + def parse_response(self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.info("Getting ETH balance information") + eth_data=response.json()['ETH'] + yield { + "wallet_name": stream_slice['name'], + "name":"ETH", + "symbol":"ETH", + "description": "Native Ethereum token", + "address":"", + "chain": "Ethereum", + "balance":eth_data['rawBalance'], + "decimal":18 + } + logging.info("Fetching Tokens balance information") + tokens_data=response.json()['tokens'] + for t in tokens_data: + try: + yield extract_token(stream_slice['name'], t) + except Exception as e: + logger.error('Dropping token not valid %s' % t ) + +