From 7616397549bc56c81fb1cbe4f7fd3042232123d2 Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Wed, 29 Nov 2023 14:18:41 +0100 Subject: [PATCH] Initial commit Signed-off-by: Alexis Pentori --- .gitignore | 160 ++++++++++++++ README.md | 166 +++++++++++++++ acceptance-test-config.yml | 39 ++++ integration_tests/__init__.py | 3 + integration_tests/abnormal_state.json | 5 + integration_tests/acceptance.py | 16 ++ integration_tests/configured_catalog.json | 22 ++ integration_tests/invalid_config.json | 3 + integration_tests/sample_config.json | 3 + integration_tests/sample_state.json | 5 + main.py | 13 ++ metadata.yaml | 30 +++ requirements.txt | 1 + secrets/config.json | 3 + setup.py | 30 +++ source_wallet_fetcher/__init__.py | 8 + source_wallet_fetcher/schemas/TODO.md | 25 +++ source_wallet_fetcher/schemas/customers.json | 16 ++ source_wallet_fetcher/schemas/employees.json | 19 ++ source_wallet_fetcher/schemas/token.json | 48 +++++ source_wallet_fetcher/source.py | 207 +++++++++++++++++++ source_wallet_fetcher/spec.yaml | 16 ++ unit_tests/__init__.py | 3 + unit_tests/test_incremental_streams.py | 59 ++++++ unit_tests/test_source.py | 22 ++ unit_tests/test_streams.py | 83 ++++++++ 26 files changed, 1005 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 acceptance-test-config.yml create mode 100644 integration_tests/__init__.py create mode 100644 integration_tests/abnormal_state.json create mode 100644 integration_tests/acceptance.py create mode 100644 integration_tests/configured_catalog.json create mode 100644 integration_tests/invalid_config.json create mode 100644 integration_tests/sample_config.json create mode 100644 integration_tests/sample_state.json create mode 100644 main.py create mode 100644 metadata.yaml create mode 100644 requirements.txt create mode 100644 secrets/config.json create mode 100644 setup.py create mode 100644 source_wallet_fetcher/__init__.py create mode 100644 source_wallet_fetcher/schemas/TODO.md create mode 100644 source_wallet_fetcher/schemas/customers.json create mode 100644 source_wallet_fetcher/schemas/employees.json create mode 100644 source_wallet_fetcher/schemas/token.json create mode 100644 source_wallet_fetcher/source.py create mode 100644 source_wallet_fetcher/spec.yaml create mode 100644 unit_tests/__init__.py create mode 100644 unit_tests/test_incremental_streams.py create mode 100644 unit_tests/test_source.py create mode 100644 unit_tests/test_streams.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68bc17f --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..abddfff --- /dev/null +++ b/README.md @@ -0,0 +1,166 @@ +# Wallet Fetcher Source + +This is the repository for the Wallet Fetcher source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/wallet-fetcher). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/wallet-fetcher) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_wallet_fetcher/spec.yaml` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source wallet-fetcher test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Use `airbyte-ci` to build your connector +The Airbyte way of building this connector is to use our `airbyte-ci` tool. +You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1). +Then running the following command will build your connector: + +```bash +airbyte-ci connectors --name source-wallet-fetcher build +``` +Once the command is done, you will find your connector image in your local docker registry: `airbyte/source-wallet-fetcher:dev`. + +##### Customizing our build process +When contributing on our connector you might need to customize the build process to add a system dependency or set an env var. +You can customize our build process by adding a `build_customization.py` module to your connector. +This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively. +It will be imported at runtime by our build process and the functions will be called if they exist. + +Here is an example of a `build_customization.py` module: +```python +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + # Feel free to check the dagger documentation for more information on the Container object and its methods. + # https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/ + from dagger import Container + + +async def pre_connector_install(base_image_container: Container) -> Container: + return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value") + +async def post_connector_install(connector_container: Container) -> Container: + return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value") +``` + +#### Build your own connector image +This connector is built using our dynamic built process in `airbyte-ci`. +The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`. +The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py). +It does not rely on a Dockerfile. + +If you would like to patch our connector and build your own a simple approach would be to: + +1. Create your own Dockerfile based on the latest version of the connector image. +```Dockerfile +FROM airbyte/source-wallet-fetcher:latest + +COPY . ./airbyte/integration_code +RUN pip install ./airbyte/integration_code + +# The entrypoint and default env vars are already set in the base image +# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] +``` +Please use this as an example. This is not optimized. + +2. Build your image: +```bash +docker build -t airbyte/source-wallet-fetcher:dev . +# Running the spec command against your patched connector +docker run airbyte/source-wallet-fetcher:dev spec +```` + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-wallet-fetcher:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-wallet-fetcher:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-wallet-fetcher:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-wallet-fetcher:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` + +### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +Please run acceptance tests via [airbyte-ci](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command): +```bash +airbyte-ci connectors --name source-wallet-fetcher test +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=source-wallet-fetcher test` +2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors). +3. Make sure the `metadata.yaml` content is up to date. +4. Make the connector documentation and its changelog is up to date (`docs/integrations/sources/wallet-fetcher.md`). +5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention). +6. Pat yourself on the back for being an awesome contributor. +7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. + diff --git a/acceptance-test-config.yml b/acceptance-test-config.yml new file mode 100644 index 0000000..5dcb4f2 --- /dev/null +++ b/acceptance-test-config.yml @@ -0,0 +1,39 @@ +# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-wallet-fetcher:dev +acceptance_tests: + spec: + tests: + - spec_path: "source_wallet_fetcher/spec.yaml" + connection: + tests: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + tests: + - config_path: "secrets/config.json" + basic_read: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] +# TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file +# expect_records: +# path: "integration_tests/expected_records.jsonl" +# extra_fields: no +# exact_order: no +# extra_records: yes + incremental: + bypass_reason: "This connector does not implement incremental sync" +# TODO uncomment this block this block if your connector implements incremental sync: +# tests: +# - config_path: "secrets/config.json" +# configured_catalog_path: "integration_tests/configured_catalog.json" +# future_state: +# future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/integration_tests/__init__.py b/integration_tests/__init__.py new file mode 100644 index 0000000..c941b30 --- /dev/null +++ b/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/integration_tests/abnormal_state.json b/integration_tests/abnormal_state.json new file mode 100644 index 0000000..52b0f2c --- /dev/null +++ b/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "todo-abnormal-value" + } +} diff --git a/integration_tests/acceptance.py b/integration_tests/acceptance.py new file mode 100644 index 0000000..9e64092 --- /dev/null +++ b/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("connector_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/integration_tests/configured_catalog.json b/integration_tests/configured_catalog.json new file mode 100644 index 0000000..36f0468 --- /dev/null +++ b/integration_tests/configured_catalog.json @@ -0,0 +1,22 @@ +{ + "streams": [ + { + "stream": { + "name": "customers", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "employees", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/integration_tests/invalid_config.json b/integration_tests/invalid_config.json new file mode 100644 index 0000000..f373299 --- /dev/null +++ b/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "todo-wrong-field": "this should be an incomplete config file, used in standard tests" +} diff --git a/integration_tests/sample_config.json b/integration_tests/sample_config.json new file mode 100644 index 0000000..ecc4913 --- /dev/null +++ b/integration_tests/sample_config.json @@ -0,0 +1,3 @@ +{ + "fix-me": "TODO" +} diff --git a/integration_tests/sample_state.json b/integration_tests/sample_state.json new file mode 100644 index 0000000..3587e57 --- /dev/null +++ b/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "value" + } +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..cf2a3b5 --- /dev/null +++ b/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_wallet_fetcher import SourceWalletFetcher + +if __name__ == "__main__": + source = SourceWalletFetcher() + launch(source, sys.argv[1:]) diff --git a/metadata.yaml b/metadata.yaml new file mode 100644 index 0000000..75fd457 --- /dev/null +++ b/metadata.yaml @@ -0,0 +1,30 @@ +data: + allowedHosts: + hosts: + - TODO # Please change to the hostname of the source. + registries: + oss: + enabled: false + cloud: + enabled: false + connectorBuildOptions: + # Please update to the latest version of the connector base image. + # https://hub.docker.com/r/airbyte/python-connector-base + # Please use the full address with sha256 hash to guarantee build reproducibility. + baseImage: docker.io/airbyte/python-connector-base:1.0.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27 + connectorSubtype: api + connectorType: source + definitionId: 1e55cfe0-f591-4281-9a20-18d89d45f685 + dockerImageTag: 0.1.0 + dockerRepository: airbyte/source-wallet-fetcher + githubIssueLabel: source-wallet-fetcher + icon: wallet-fetcher.svg + license: MIT + name: Wallet Fetcher + releaseDate: TODO + supportLevel: community + releaseStage: alpha + documentationUrl: https://docs.airbyte.com/integrations/sources/wallet-fetcher + tags: + - language:python +metadataSpecVersion: "1.0" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d6e1198 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/secrets/config.json b/secrets/config.json new file mode 100644 index 0000000..f5f8933 --- /dev/null +++ b/secrets/config.json @@ -0,0 +1,3 @@ +{ + "fix-me": "TODO populate with needed configuration for integration tests or delete this file and any references to it. The schema of this file should match what is in your spec.yaml" +} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c12f514 --- /dev/null +++ b/setup.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.2", +] + +TEST_REQUIREMENTS = [ + "requests-mock~=1.9.3", + "pytest~=6.2", + "pytest-mock~=3.6.1", + "connector-acceptance-test", +] + +setup( + name="source_wallet_fetcher", + description="Source implementation for Wallet Fetcher.", + author="Airbyte", + author_email="devops@status.im", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/source_wallet_fetcher/__init__.py b/source_wallet_fetcher/__init__.py new file mode 100644 index 0000000..47b650d --- /dev/null +++ b/source_wallet_fetcher/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceWalletFetcher + +__all__ = ["SourceWalletFetcher"] diff --git a/source_wallet_fetcher/schemas/TODO.md b/source_wallet_fetcher/schemas/TODO.md new file mode 100644 index 0000000..cf1efad --- /dev/null +++ b/source_wallet_fetcher/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/source_wallet_fetcher/schemas/customers.json b/source_wallet_fetcher/schemas/customers.json new file mode 100644 index 0000000..9a4b134 --- /dev/null +++ b/source_wallet_fetcher/schemas/customers.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "signup_date": { + "type": ["null", "string"], + "format": "date-time" + } + } +} diff --git a/source_wallet_fetcher/schemas/employees.json b/source_wallet_fetcher/schemas/employees.json new file mode 100644 index 0000000..2fa01a0 --- /dev/null +++ b/source_wallet_fetcher/schemas/employees.json @@ -0,0 +1,19 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "years_of_service": { + "type": ["null", "integer"] + }, + "start_date": { + "type": ["null", "string"], + "format": "date-time" + } + } +} diff --git a/source_wallet_fetcher/schemas/token.json b/source_wallet_fetcher/schemas/token.json new file mode 100644 index 0000000..a871bd3 --- /dev/null +++ b/source_wallet_fetcher/schemas/token.json @@ -0,0 +1,48 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "symbol": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "address": { + "type": [ + "null", + "string" + ] + }, + "chain": { + "type": [ + "null", + "string" + ] + }, + "balance": { + "type": [ + "null", + "integer" + ] + }, + "decimal": { + "type": [ + "null", + "integer" + ] + } + } +} diff --git a/source_wallet_fetcher/source.py b/source_wallet_fetcher/source.py new file mode 100644 index 0000000..02548cb --- /dev/null +++ b/source_wallet_fetcher/source.py @@ -0,0 +1,207 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import logging +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator + +logger = logging.getLogger("airbyte") + +# Basic full refresh stream +class WalletFetcherStream(HttpStream, ABC): + """ + TODO remove this comment + + This class represents a stream output by the connector. + This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy, + parsing responses etc.. + + Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream. + + Typically for REST APIs each stream corresponds to a resource in the API. For example if the API + contains the endpoints + - GET v1/customers + - GET v1/employees + + then you should have three classes: + `class WalletFetcherStream(HttpStream, ABC)` which is the current class + `class Customers(WalletFetcherStream)` contains behavior to pull data for customers using v1/customers + `class Employees(WalletFetcherStream)` contains behavior to pull data for employees using v1/employees + + If some streams implement incremental sync, it is typical to create another class + `class IncrementalWalletFetcherStream((WalletFetcherStream), ABC)` then have concrete stream implementations extend it. An example + is provided below. + + See the reference docs for the full list of configurable options. + """ + + # TODO: Fill in the url base. Required. + url_base = "`https://api.ethplorer.io/getAddressInfo/" + + # Set this as a noop. + primary_key = None + + def __init__(self, wallet_address: str, **kwargs): + super().__init__(**kwargs) + self.wallet_address = wallet_address + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None. + + This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed + to most other methods in this class to help you form headers, request bodies, query params, etc.. + + For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a + 'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1. + The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page']. + + :param response: the most recent response from the API + :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. + If there are no more pages in the result, return None. + """ + return None + + def path(self, **kwargs) -> str: + address = self.wallet_address: + return f"{address}?apiKey=freekey" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + """ + TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params. + Usually contains common params e.g. pagination size etc. + """ + return {} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + TODO: Override this method to define how a response is parsed. + :return an iterable containing each record in the response + """ + yield {} + + +class Customers(WalletFetcherStream): + """ + TODO: Change class name to match the table/data source this stream corresponds to. + """ + + # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. + primary_key = "customer_id" + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + """ + TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this + should return "customers". Required. + """ + return "customers" + + +# Basic incremental stream +class IncrementalWalletFetcherStream(WalletFetcherStream, ABC): + """ + TODO fill in details of this class to implement functionality related to incremental syncs for your connector. + if you do not need to implement incremental sync for any streams, remove this class. + """ + + # TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason. + state_checkpoint_interval = None + + @property + def cursor_field(self) -> str: + """ + TODO + Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is + usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. + + :return str: The name of the cursor field. + """ + return [] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and + the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. + """ + return {} + + +class Employees(IncrementalWalletFetcherStream): + """ + TODO: Change class name to match the table/data source this stream corresponds to. + """ + + # TODO: Fill in the cursor_field. Required. + cursor_field = "start_date" + + # TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp. + primary_key = "employee_id" + + def path(self, **kwargs) -> str: + """ + TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should + return "single". Required. + """ + return "employees" + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + """ + TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method. + + Slices control when state is saved. Specifically, state is saved after a slice has been fully read. + This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts" + section of the docs for more information. + + The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the + necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created. + This means that data in a slice is usually closely related to a stream's cursor_field and stream_state. + + An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help + craft that specific request. + + For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement + this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date + till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into + the date query param. + """ + raise NotImplementedError("Implement stream slices or delete this method!") + + +# Source +class SourceWalletFetcher(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API + + See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 + for an example. + + :param config: the user-input config object conforming to the connector's spec.yaml + :param logger: logger object + :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + logger.info("Checking connections API") + # TODO add a check for each endpoint + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """ + TODO: Replace the streams below with your own streams. + + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + # TODO remove the authenticator if not required. + auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support + return [Customers(authenticator=auth), Employees(authenticator=auth)] diff --git a/source_wallet_fetcher/spec.yaml b/source_wallet_fetcher/spec.yaml new file mode 100644 index 0000000..e0cbe3e --- /dev/null +++ b/source_wallet_fetcher/spec.yaml @@ -0,0 +1,16 @@ +documentationUrl: https://docsurl.com +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Wallet Fetcher Spec + type: object + required: + - wallet_address + # - chains + properties: + wallet_address: + # TODO: change to List to handle multiple wallets + type: string + description: Adress of the wallet + pattern: ^[a-zA-W0-9]+$ + examples: + - '0x766c77F7f7edC99acdC9475012756B98037a8F69' diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py new file mode 100644 index 0000000..c941b30 --- /dev/null +++ b/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/unit_tests/test_incremental_streams.py b/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000..b972736 --- /dev/null +++ b/unit_tests/test_incremental_streams.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_wallet_fetcher.source import IncrementalWalletFetcherStream + + +@fixture +def patch_incremental_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(IncrementalWalletFetcherStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalWalletFetcherStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalWalletFetcherStream, "__abstractmethods__", set()) + + +def test_cursor_field(patch_incremental_base_class): + stream = IncrementalWalletFetcherStream() + # TODO: replace this with your expected cursor field + expected_cursor_field = [] + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_base_class): + stream = IncrementalWalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"current_stream_state": None, "latest_record": None} + # TODO: replace this with your expected updated stream state + expected_state = {} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(patch_incremental_base_class): + stream = IncrementalWalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + # TODO: replace this with your expected stream slices list + expected_stream_slice = [None] + assert stream.stream_slices(**inputs) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker): + mocker.patch.object(IncrementalWalletFetcherStream, "cursor_field", "dummy_field") + stream = IncrementalWalletFetcherStream() + assert stream.supports_incremental + + +def test_source_defined_cursor(patch_incremental_base_class): + stream = IncrementalWalletFetcherStream() + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(patch_incremental_base_class): + stream = IncrementalWalletFetcherStream() + # TODO: replace this with your expected checkpoint interval + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/unit_tests/test_source.py b/unit_tests/test_source.py new file mode 100644 index 0000000..88d92dd --- /dev/null +++ b/unit_tests/test_source.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_wallet_fetcher.source import SourceWalletFetcher + + +def test_check_connection(mocker): + source = SourceWalletFetcher() + logger_mock, config_mock = MagicMock(), MagicMock() + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +def test_streams(mocker): + source = SourceWalletFetcher() + config_mock = MagicMock() + streams = source.streams(config_mock) + # TODO: replace this with your streams number + expected_streams_number = 2 + assert len(streams) == expected_streams_number diff --git a/unit_tests/test_streams.py b/unit_tests/test_streams.py new file mode 100644 index 0000000..b929181 --- /dev/null +++ b/unit_tests/test_streams.py @@ -0,0 +1,83 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_wallet_fetcher.source import WalletFetcherStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(WalletFetcherStream, "path", "v0/example_endpoint") + mocker.patch.object(WalletFetcherStream, "primary_key", "test_primary_key") + mocker.patch.object(WalletFetcherStream, "__abstractmethods__", set()) + + +def test_request_params(patch_base_class): + stream = WalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request parameters + expected_params = {} + assert stream.request_params(**inputs) == expected_params + + +def test_next_page_token(patch_base_class): + stream = WalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected next page token + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class): + stream = WalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected parced object + expected_parsed_object = {} + assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = WalletFetcherStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request headers + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = WalletFetcherStream() + # TODO: replace this with your expected http request method + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = WalletFetcherStream() + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = WalletFetcherStream() + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time