diff --git a/gh_sync/main.py b/gh_sync/main.py deleted file mode 100644 index 6bb578e..0000000 --- a/gh_sync/main.py +++ /dev/null @@ -1,80 +0,0 @@ -import sys -import json -import logging as LOG -from os import path -from datetime import datetime, timedelta -import logging -import sys - -from airflow import DAG -from airflow.models import Variable -from airflow.models.param import Param -from airflow.decorators import dag, task -from airflow.operators.python import get_current_context - -from airflow.utils.dates import days_ago -from airflow.providers.http.operators.http import SimpleHttpOperator -from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator - -""" -DAG to fetch data from the different repo organisation in GitHub -""" - -logging.basicConfig(stream=sys.stdout, level=logging.INFO) - -ARGS = { - 'owner': 'apentori', - 'depends_on_past': False, - 'start_date': datetime(2023,6,1), - 'email': ['alexis@status.im'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 0, - 'retry_delay': timedelta(minutes=10), -} - -with DAG('gh_repos_sync', - default_args=ARGS, - schedule_interval='30 */6 * * *', - catchup=False) as dag: - - get_workflow_id = SimpleHttpOperator( - task_id='get_workflow_id', - http_conn_id='airbyte_conn', - endpoint='/api/v1/workspaces/list', - method="POST", - headers={"Content-type": "application/json", "timeout": "1200"}, - response_filter=lambda response: response.json()["workspaces"][0]["workspaceId"], - ) - - get_connections = SimpleHttpOperator( - task_id='get_connections_id', - http_conn_id='airbyte_conn', - endpoint='/api/v1/connections/list', - method="POST", - headers={"Content-type": "application/json", "timeout": "1200"}, - data=json.dumps( - {"workspaceId": f"{get_workflow_id.output}"} - ), - response_filter= lambda response: response.json()["connections"] - ) - - @dag.task(task_id="extracts_conn_id", multiple_outputs=True) - def extract_conn_id(output): - logging.info('Connection ID %s' % output) - backend_conn=list(filter(lambda x: x['name'] == 'gh_sync_logos_repos', output)) - return { - "gh_logos": f"{backend_conn[0]['connectionId']}", - } - - connections_id = extract_conn_id(get_connections.output) - - airbyte_fetch_logos = AirbyteTriggerSyncOperator( - task_id='airbyte_fetch_github', - airbyte_conn_id='airbyte_conn', - connection_id=connections_id['gh_logos'], - asynchronous=False, - wait_seconds=3 - ) - - get_workflow_id >> get_connections >> connections_id >> airbyte_fetch_logos diff --git a/github_extraction.py b/github_extraction.py new file mode 100644 index 0000000..7b7f976 --- /dev/null +++ b/github_extraction.py @@ -0,0 +1,53 @@ +import sys +import json +from os import path +from datetime import datetime, timedelta +import logging + +from airflow import DAG +from airflow.decorators import dag, task + +from airflow.utils.dates import days_ago +from airflow.providers.http.operators.http import SimpleHttpOperator +from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator + +# HACK: Fix for loading relative modules. +sys.path.append(path.dirname(path.realpath(__file__))) +from tasks.airbyte import fetch_airbyte_connections_tg +from providers.airbyte.operator import AirbyteTriggerSyncOperator + +""" +DAG to fetch data from the different repo organisation in GitHub +""" + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) + +ARGS = { + 'owner': 'apentori', + 'depends_on_past': False, + 'start_date': datetime(2023,6,1), + 'email': ['alexis@status.im'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=10), +} + +airbyte_connections=['gh_sync_logos_repos'] + +@dag('github_repo_extraction', + default_args=ARGS, + schedule_interval='30 */6 * * *') +def github_repo_extraction(): + connections_id=fetch_airbyte_connections_tg(airbyte_connections) + + + AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_github', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['gh_sync_logos_repos'], + asynchronous=False, + wait_seconds=3 + ) + +github_repo_extraction()