From 967db04e304404fc2f073361b122de63395ddbc3 Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Thu, 30 Nov 2023 11:02:08 +0100 Subject: [PATCH] Finishing basic Eth wallet import Making basic wallet extraction working Adding Dockerfile Signed-off-by: Alexis Pentori --- Dockerfile | 8 + sample_files/configured_catalog.json | 23 +++ sample_files/wallet.json | 3 + source_wallet_fetcher/schemas/customers.json | 16 -- source_wallet_fetcher/schemas/employees.json | 19 -- source_wallet_fetcher/schemas/token.json | 2 +- source_wallet_fetcher/source.py | 181 +++---------------- source_wallet_fetcher/utils.py | 24 +++ 8 files changed, 82 insertions(+), 194 deletions(-) create mode 100644 Dockerfile create mode 100644 sample_files/configured_catalog.json create mode 100644 sample_files/wallet.json delete mode 100644 source_wallet_fetcher/schemas/customers.json delete mode 100644 source_wallet_fetcher/schemas/employees.json create mode 100644 source_wallet_fetcher/utils.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0d1e900 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,8 @@ +FROM airbyte/python-connector-base:1.1.0 + +COPY . ./airbyte/integration_code +RUN pip install ./airbyte/integration_code + +# The entrypoint and default env vars are already set in the base image +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] diff --git a/sample_files/configured_catalog.json b/sample_files/configured_catalog.json new file mode 100644 index 0000000..4d1a020 --- /dev/null +++ b/sample_files/configured_catalog.json @@ -0,0 +1,23 @@ +{ + "streams": [ + { + "stream": { + "name": "token", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "wallet_address": { + "type": "string" + } + } + }, + "supported_sync_modes": [ + "full_refresh" + ] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/sample_files/wallet.json b/sample_files/wallet.json new file mode 100644 index 0000000..db01285 --- /dev/null +++ b/sample_files/wallet.json @@ -0,0 +1,3 @@ +{ + "wallet_address": "0x23f4569002a5A07f0Ecf688142eEB6bcD883eeF8" +} diff --git a/source_wallet_fetcher/schemas/customers.json b/source_wallet_fetcher/schemas/customers.json deleted file mode 100644 index 9a4b134..0000000 --- a/source_wallet_fetcher/schemas/customers.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "signup_date": { - "type": ["null", "string"], - "format": "date-time" - } - } -} diff --git a/source_wallet_fetcher/schemas/employees.json b/source_wallet_fetcher/schemas/employees.json deleted file mode 100644 index 2fa01a0..0000000 --- a/source_wallet_fetcher/schemas/employees.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "years_of_service": { - "type": ["null", "integer"] - }, - "start_date": { - "type": ["null", "string"], - "format": "date-time" - } - } -} diff --git a/source_wallet_fetcher/schemas/token.json b/source_wallet_fetcher/schemas/token.json index a871bd3..0092e2c 100644 --- a/source_wallet_fetcher/schemas/token.json +++ b/source_wallet_fetcher/schemas/token.json @@ -35,7 +35,7 @@ "balance": { "type": [ "null", - "integer" + "numeric" ] }, "decimal": { diff --git a/source_wallet_fetcher/source.py b/source_wallet_fetcher/source.py index 02548cb..4ef6878 100644 --- a/source_wallet_fetcher/source.py +++ b/source_wallet_fetcher/source.py @@ -3,11 +3,12 @@ # -from abc import ABC +#from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple - +from .utils import extract_token import logging import requests +import json from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream @@ -16,35 +17,9 @@ from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator logger = logging.getLogger("airbyte") # Basic full refresh stream -class WalletFetcherStream(HttpStream, ABC): - """ - TODO remove this comment - - This class represents a stream output by the connector. - This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy, - parsing responses etc.. - - Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream. - - Typically for REST APIs each stream corresponds to a resource in the API. For example if the API - contains the endpoints - - GET v1/customers - - GET v1/employees - - then you should have three classes: - `class WalletFetcherStream(HttpStream, ABC)` which is the current class - `class Customers(WalletFetcherStream)` contains behavior to pull data for customers using v1/customers - `class Employees(WalletFetcherStream)` contains behavior to pull data for employees using v1/employees - - If some streams implement incremental sync, it is typical to create another class - `class IncrementalWalletFetcherStream((WalletFetcherStream), ABC)` then have concrete stream implementations extend it. An example - is provided below. - - See the reference docs for the full list of configurable options. - """ - +class Token(HttpStream): # TODO: Fill in the url base. Required. - url_base = "`https://api.ethplorer.io/getAddressInfo/" + url_base = "https://api.ethplorer.io/getAddressInfo/" # Set this as a noop. primary_key = None @@ -54,145 +29,36 @@ class WalletFetcherStream(HttpStream, ABC): self.wallet_address = wallet_address def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None. - - This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed - to most other methods in this class to help you form headers, request bodies, query params, etc.. - - For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a - 'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1. - The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page']. - - :param response: the most recent response from the API - :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. - If there are no more pages in the result, return None. - """ return None def path(self, **kwargs) -> str: - address = self.wallet_address: + address = self.wallet_address return f"{address}?apiKey=freekey" def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - """ - TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params. - Usually contains common params e.g. pagination size etc. - """ - return {} + return {"wallet_address": self.wallet_address} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """ - TODO: Override this method to define how a response is parsed. - :return an iterable containing each record in the response - """ - yield {} - - -class Customers(WalletFetcherStream): - """ - TODO: Change class name to match the table/data source this stream corresponds to. - """ - - # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. - primary_key = "customer_id" - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - """ - TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this - should return "customers". Required. - """ - return "customers" - - -# Basic incremental stream -class IncrementalWalletFetcherStream(WalletFetcherStream, ABC): - """ - TODO fill in details of this class to implement functionality related to incremental syncs for your connector. - if you do not need to implement incremental sync for any streams, remove this class. - """ - - # TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason. - state_checkpoint_interval = None - - @property - def cursor_field(self) -> str: - """ - TODO - Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is - usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. - - :return str: The name of the cursor field. - """ - return [] - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and - the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. - """ - return {} - - -class Employees(IncrementalWalletFetcherStream): - """ - TODO: Change class name to match the table/data source this stream corresponds to. - """ - - # TODO: Fill in the cursor_field. Required. - cursor_field = "start_date" - - # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. - primary_key = "employee_id" - - def path(self, **kwargs) -> str: - """ - TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should - return "single". Required. - """ - return "employees" - - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - """ - TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. - - Slices control when state is saved. Specifically, state is saved after a slice has been fully read. - This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" - section of the docs for more information. - - The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the - necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. - This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. - - An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help - craft that specific request. - - For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement - this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date - till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into - the date query param. - """ - raise NotImplementedError("Implement stream slices or delete this method!") - - + logger.info("Getting ETH balance information") + eth_data=response.json()['ETH'] + yield { + "name":"ETH", + "symbol":"ETH", + "description": "Native Ethereum token", + "address":"", + "chain": "Ethereum", + "balance":eth_data['rawBalance'] + , "decimal":18 + } + logging.info("Fetching Tokens balance information") + tokens_data=response.json()['tokens'] + for t in tokens_data: + yield extract_token(t) # Source class SourceWalletFetcher(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: - """ - TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API - - See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 - for an example. - - :param config: the user-input config object conforming to the connector's spec.yaml - :param logger: logger object - :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. - """ - logger.info("Checking connections API") # TODO add a check for each endpoint return True, None @@ -203,5 +69,4 @@ class SourceWalletFetcher(AbstractSource): :param config: A Mapping of the user input configuration as defined in the connector spec. """ # TODO remove the authenticator if not required. - auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support - return [Customers(authenticator=auth), Employees(authenticator=auth)] + return [Token(wallet_address=config["wallet_address"])] diff --git a/source_wallet_fetcher/utils.py b/source_wallet_fetcher/utils.py new file mode 100644 index 0000000..f8ef1ad --- /dev/null +++ b/source_wallet_fetcher/utils.py @@ -0,0 +1,24 @@ +import logging +import json + + +def extract_token(token_data): + name= 'No Name' if 'name' not in token_data['tokenInfo'] else token_data['tokenInfo']['name'] + description= 'No description available' if 'description' not in token_data['tokenInfo'] else token_data['tokenInfo']['description'] + symbol= 'No Symbol' if 'symbol' not in token_data['tokenInfo'] else token_data['tokenInfo']['symbol'] + try: + token = { + "name": name, + "symbol": symbol, + "description": description, + "address":token_data['tokenInfo']['address'], + "chain": "Ethereum", + "balance": token_data['rawBalance'], + "decimal": token_data['tokenInfo']['decimals'] + } + return token + except KeyError: + logging.error("Error when trying to extract data from token %s" % tokens_data) + return None + +