Refactoring DAG for spiff data extractions

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2023-12-14 13:50:52 +01:00
parent e750b74d5c
commit 39cc101b0c
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
2 changed files with 57 additions and 116 deletions

57
spiff_extraction.py Normal file
View File

@ -0,0 +1,57 @@
import sys
from os import path
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
# HACK: Fix for loading relative modules.
sys.path.append(path.dirname(path.realpath(__file__)))
from tasks.airbyte import fetch_airbyte_connections_tg
from providers.airbyte.operator import AirbyteTriggerSyncOperator
"""
DAG to fetch data from Spiff workflow
"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
ARGS = {
'owner': 'apentori',
'depends_on_past': False,
'start_date': datetime(2023,6,1),
'email': ['alexis@status.im'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
}
airbyte_connections=['extract_spiff_backend_mod_prod', 'extract_spiff_connector_mod_prod']
@dag('spiff_extraction',
default_args=ARGS,
schedule_interval='30 */6 * * *')
def spiff_extraction():
connections_id=fetch_airbyte_connections_tg(airbyte_connections)
# Trigger Airbyte sync for Spiff Backend DB
AirbyteTriggerSyncOperator(
task_id='airbyte_sync_spiff_backend',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['extract_spiff_backend'],
asynchronous=False,
wait_seconds=3
)
AirbyteTriggerSyncOperator(
task_id='airbyte_sync_spiff_connector',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['extract_spiff_connector'],
asynchronous=False,
wait_seconds=3
)
spiff_extraction()

View File

@ -1,116 +0,0 @@
import sys
import json
import logging as LOG
from os import path
from datetime import datetime, timedelta
import logging
import sys
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
from airflow.providers.http.operators.http import SimpleHttpOperator
"""
DAG to fetch data from Spiff workflow
"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
ARGS = {
'owner': 'apentori',
'depends_on_past': False,
'start_date': datetime(2023,6,1),
'email': ['alexis@status.im'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
}
with DAG('spiff_sync',
default_args=ARGS,
schedule_interval='0 /6 * * *',
catchup=False) as dag:
get_workflow_id = SimpleHttpOperator(
task_id='get_workflow_id',
http_conn_id='airbyte_conn',
endpoint='/api/v1/workspaces/list',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
response_filter=lambda response: response.json()["workspaces"][0]["workspaceId"],
)
get_workflow_id.doc_md = """\
## Getting the workflow ID
This task call Airbyte API to get the workflow ID.
It is necessairy to get the connections ID for triggering the jobs in Airbyte.
"""
get_connections = SimpleHttpOperator(
task_id='get_connections_id',
http_conn_id='airbyte_conn_example',
endpoint='/api/v1/connections/list',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
data=json.dumps(
{"workspaceId": f"{get_workflow_id.output}"}
),
response_filter= lambda response: response.json()["connections"]
)
get_connections.doc_md = """\
## Getting Connctions
This task call Airbyte API to get the connections ID.
It is necessairy to get the connections ID for triggering the jobs in Airbyte.
"""
@dag.task(task_id="extracts_conn_id", multiple_outputs=True)
def extract_conn_id(output):
logging.info('Connection ID %s' % output)
backend_conn=list(filter(lambda x: x['name'] == 'extract_spiff_backend_mod_prod', output))
connector_conn=list(filter(lambda x: x['name'] == 'extract_spiff_connector_mod_prod', output))
return {
"extract_spiff_backend_mod_prod": f"{backend_conn[0]['connectionId']}",
"extract_spiff_connector_mod_prod": f"{connector_conn[0]['connectionId']}"
}
connections_id = extract_conn_id(get_connections.output)
# Trigger Airbyte sync for Spiff Backend DB
airbyte_sync_spiff_backend = SimpleHttpOperator(
task_id='airbyte_sync_spiff_backend',
http_conn_id='airbyte_conn',
endpoint='/api/v1/connections/sync',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
data=json.dumps(
{"connectionId": f"{connections_id['extract_spiff_backend_mod_prod']}"}
)
)
# Trigger Airbyte Sync for Spiff Connector DB
airbyte_sync_spiff_connector = SimpleHttpOperator(
task_id='airbyte_sync_spiff_connector',
http_conn_id='airbyte_conn',
endpoint='/api/v1/connections/sync',
method="POST",
headers={"Content-type": "application/json", "timeout": "1200"},
data=json.dumps(
{"connectionId": f"{connections_id['extract_spiff_connector_mod_prod']}"}
)
)
get_workflow_id >> get_connections >> connections_id >> airbyte_sync_spiff_backend >> airbyte_sync_spiff_connector
dag.doc_md = __doc__