Finishing basic Eth wallet import

Making basic wallet extraction working
  Adding Dockerfile

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2023-11-30 11:02:08 +01:00
parent 7616397549
commit 967db04e30
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
8 changed files with 82 additions and 194 deletions

8
Dockerfile Normal file
View File

@ -0,0 +1,8 @@
FROM airbyte/python-connector-base:1.1.0
COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code
# The entrypoint and default env vars are already set in the base image
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

View File

@ -0,0 +1,23 @@
{
"streams": [
{
"stream": {
"name": "token",
"json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"wallet_address": {
"type": "string"
}
}
},
"supported_sync_modes": [
"full_refresh"
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

3
sample_files/wallet.json Normal file
View File

@ -0,0 +1,3 @@
{
"wallet_address": "0x23f4569002a5A07f0Ecf688142eEB6bcD883eeF8"
}

View File

@ -1,16 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"signup_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
}

View File

@ -1,19 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"years_of_service": {
"type": ["null", "integer"]
},
"start_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
}

View File

@ -35,7 +35,7 @@
"balance": { "balance": {
"type": [ "type": [
"null", "null",
"integer" "numeric"
] ]
}, },
"decimal": { "decimal": {

View File

@ -3,11 +3,12 @@
# #
from abc import ABC #from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from .utils import extract_token
import logging import logging
import requests import requests
import json
from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http import HttpStream
@ -16,35 +17,9 @@ from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
logger = logging.getLogger("airbyte") logger = logging.getLogger("airbyte")
# Basic full refresh stream # Basic full refresh stream
class WalletFetcherStream(HttpStream, ABC): class Token(HttpStream):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class WalletFetcherStream(HttpStream, ABC)` which is the current class
`class Customers(WalletFetcherStream)` contains behavior to pull data for customers using v1/customers
`class Employees(WalletFetcherStream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class IncrementalWalletFetcherStream((WalletFetcherStream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required. # TODO: Fill in the url base. Required.
url_base = "`https://api.ethplorer.io/getAddressInfo/" url_base = "https://api.ethplorer.io/getAddressInfo/"
# Set this as a noop. # Set this as a noop.
primary_key = None primary_key = None
@ -54,145 +29,36 @@ class WalletFetcherStream(HttpStream, ABC):
self.wallet_address = wallet_address self.wallet_address = wallet_address
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
return None return None
def path(self, **kwargs) -> str: def path(self, **kwargs) -> str:
address = self.wallet_address: address = self.wallet_address
return f"{address}?apiKey=freekey" return f"{address}?apiKey=freekey"
def request_params( def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]: ) -> MutableMapping[str, Any]:
""" return {"wallet_address": self.wallet_address}
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
return {}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
""" logger.info("Getting ETH balance information")
TODO: Override this method to define how a response is parsed. eth_data=response.json()['ETH']
:return an iterable containing each record in the response yield {
""" "name":"ETH",
yield {} "symbol":"ETH",
"description": "Native Ethereum token",
"address":"",
class Customers(WalletFetcherStream): "chain": "Ethereum",
""" "balance":eth_data['rawBalance']
TODO: Change class name to match the table/data source this stream corresponds to. , "decimal":18
""" }
logging.info("Fetching Tokens balance information")
# TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. tokens_data=response.json()['tokens']
primary_key = "customer_id" for t in tokens_data:
yield extract_token(t)
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
"""
TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this
should return "customers". Required.
"""
return "customers"
# Basic incremental stream
class IncrementalWalletFetcherStream(WalletFetcherStream, ABC):
"""
TODO fill in details of this class to implement functionality related to incremental syncs for your connector.
if you do not need to implement incremental sync for any streams, remove this class.
"""
# TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason.
state_checkpoint_interval = None
@property
def cursor_field(self) -> str:
"""
TODO
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental.
:return str: The name of the cursor field.
"""
return []
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
return {}
class Employees(IncrementalWalletFetcherStream):
"""
TODO: Change class name to match the table/data source this stream corresponds to.
"""
# TODO: Fill in the cursor_field. Required.
cursor_field = "start_date"
# TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp.
primary_key = "employee_id"
def path(self, **kwargs) -> str:
"""
TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should
return "single". Required.
"""
return "employees"
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
"""
TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method.
Slices control when state is saved. Specifically, state is saved after a slice has been fully read.
This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts"
section of the docs for more information.
The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the
necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created.
This means that data in a slice is usually closely related to a stream's cursor_field and stream_state.
An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help
craft that specific request.
For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement
this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date
till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into
the date query param.
"""
raise NotImplementedError("Implement stream slices or delete this method!")
# Source # Source
class SourceWalletFetcher(AbstractSource): class SourceWalletFetcher(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]: def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
logger.info("Checking connections API")
# TODO add a check for each endpoint # TODO add a check for each endpoint
return True, None return True, None
@ -203,5 +69,4 @@ class SourceWalletFetcher(AbstractSource):
:param config: A Mapping of the user input configuration as defined in the connector spec. :param config: A Mapping of the user input configuration as defined in the connector spec.
""" """
# TODO remove the authenticator if not required. # TODO remove the authenticator if not required.
auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support return [Token(wallet_address=config["wallet_address"])]
return [Customers(authenticator=auth), Employees(authenticator=auth)]

View File

@ -0,0 +1,24 @@
import logging
import json
def extract_token(token_data):
name= 'No Name' if 'name' not in token_data['tokenInfo'] else token_data['tokenInfo']['name']
description= 'No description available' if 'description' not in token_data['tokenInfo'] else token_data['tokenInfo']['description']
symbol= 'No Symbol' if 'symbol' not in token_data['tokenInfo'] else token_data['tokenInfo']['symbol']
try:
token = {
"name": name,
"symbol": symbol,
"description": description,
"address":token_data['tokenInfo']['address'],
"chain": "Ethereum",
"balance": token_data['rawBalance'],
"decimal": token_data['tokenInfo']['decimals']
}
return token
except KeyError:
logging.error("Error when trying to extract data from token %s" % tokens_data)
return None