Refactorying DAG for github data extractions

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2023-12-14 13:43:55 +01:00
parent d54d0dabc9
commit e750b74d5c
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
2 changed files with 53 additions and 80 deletions

View File

@ -1,80 +0,0 @@
import sys
import json
import logging as LOG
from os import path
from datetime import datetime, timedelta
import logging
import sys
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
"""
DAG to fetch data from the different repo organisation in GitHub
"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
ARGS = {
'owner': 'apentori',
'depends_on_past': False,
'start_date': datetime(2023,6,1),
'email': ['alexis@status.im'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
}
with DAG('gh_repos_sync',
default_args=ARGS,
schedule_interval='30 */6 * * *',
catchup=False) as dag:
get_workflow_id = SimpleHttpOperator(
task_id='get_workflow_id',
http_conn_id='airbyte_conn',
endpoint='/api/v1/workspaces/list',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
response_filter=lambda response: response.json()["workspaces"][0]["workspaceId"],
)
get_connections = SimpleHttpOperator(
task_id='get_connections_id',
http_conn_id='airbyte_conn',
endpoint='/api/v1/connections/list',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
data=json.dumps(
{"workspaceId": f"{get_workflow_id.output}"}
),
response_filter= lambda response: response.json()["connections"]
)
@dag.task(task_id="extracts_conn_id", multiple_outputs=True)
def extract_conn_id(output):
logging.info('Connection ID %s' % output)
backend_conn=list(filter(lambda x: x['name'] == 'gh_sync_logos_repos', output))
return {
"gh_logos": f"{backend_conn[0]['connectionId']}",
}
connections_id = extract_conn_id(get_connections.output)
airbyte_fetch_logos = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_github',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['gh_logos'],
asynchronous=False,
wait_seconds=3
)
get_workflow_id >> get_connections >> connections_id >> airbyte_fetch_logos

53
github_extraction.py Normal file
View File

@ -0,0 +1,53 @@
import sys
import json
from os import path
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
# HACK: Fix for loading relative modules.
sys.path.append(path.dirname(path.realpath(__file__)))
from tasks.airbyte import fetch_airbyte_connections_tg
from providers.airbyte.operator import AirbyteTriggerSyncOperator
"""
DAG to fetch data from the different repo organisation in GitHub
"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
ARGS = {
'owner': 'apentori',
'depends_on_past': False,
'start_date': datetime(2023,6,1),
'email': ['alexis@status.im'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
}
airbyte_connections=['gh_sync_logos_repos']
@dag('github_repo_extraction',
default_args=ARGS,
schedule_interval='30 */6 * * *')
def github_repo_extraction():
connections_id=fetch_airbyte_connections_tg(airbyte_connections)
AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_github',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['gh_sync_logos_repos'],
asynchronous=False,
wait_seconds=3
)
github_repo_extraction()