diff --git a/comm_extraction.py b/comm_extraction.py index 9acf54b..ddcf6b6 100644 --- a/comm_extraction.py +++ b/comm_extraction.py @@ -36,16 +36,31 @@ ARGS = { airbyte_connections=[ 'discord_fetcher', - 'simplecast_fetch' + 'simplecast_fetch', + 'com_twitter_nomos_tech', + 'com_twitter_acid_info', + 'com_twitter_codex', + 'com_twitter_ethstatus', + 'com_twitter_logos', + 'com_twitter_nimbus', + 'com_twitter_waku', ] + +@task(task_id="wait_for_api") +def wait_for_api(): + # Twitter API limit number of call each 15 min + # https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets#tab1 + time.sleep(900) + + @dag( 'comm_extraction', default_args=ARGS, # Run every 4 hours schedule_interval='0 */24 * * * ' ) -def forums_sync(): +def comm_extraction(): connections_id=fetch_airbyte_connections_tg(airbyte_connections) # Trigger Airbyte fetch Data from Discourse @@ -63,7 +78,57 @@ def forums_sync(): asynchronous=False, wait_seconds=3 ) - - connections_id >> [discord_fetcher, simplecast_fetch] + twitter_acid_info = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_acid_info', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_acid_info'], + asynchronous=False, + wait_seconds=3 + ) + twitter_nomos_tech = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_nomos_tech', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_nomos_tech'], + asynchronous=False, + wait_seconds=3 + ) + twitter_codex = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_codex', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_codex'], + asynchronous=False, + wait_seconds=3 + ) + twitter_logos = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_logos', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_logos'], + asynchronous=False, + wait_seconds=3 + ) + twitter_ethstatus = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_ethstatus', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_ethstatus'], + asynchronous=False, + wait_seconds=3 + ) + twitter_nimbus = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_nimbus', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_nimbus'], + asynchronous=False, + wait_seconds=3 + ) + twitter_waku = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_twitter_waku', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['com_twitter_waku'], + asynchronous=False, + wait_seconds=3 + ) -forums_sync() + + connections_id >> [discord_fetcher, simplecast_fetch] >> twitter_acid_info >> wait_for_api >> twitter_nomos_tech >> wait_for_api >> twitter_codex >> wait_for_api >> twitter_ethstatus >> wait_for_api >> twitter_logos >> wait_for_api >> twitter_waku >> wait_for_api >> twitter_nimbus >> wait_for_api + +comm_extraction() diff --git a/dbt_spiff.py b/dbt_spiff.py new file mode 100644 index 0000000..4a2734e --- /dev/null +++ b/dbt_spiff.py @@ -0,0 +1,69 @@ +import sys +import logging +from os import path +from datetime import datetime, timedelta +from airflow import DAG +from airflow.models import Variable +from airflow.models.param import Param + +from airflow.decorators import dag +from airflow.providers.http.operators.http import SimpleHttpOperator +from airflow.operators.bash_operator import BashOperator + +# HACK: Fix for loading relative modules. +sys.path.append(path.dirname(path.realpath(__file__))) +from tasks.airbyte import fetch_airbyte_connections_tg, update_airbyte_source_config_tg +from providers.airbyte.operator import AirbyteTriggerSyncOperator + +""" +DAG to sync data for mod prod spiff environment +""" +logging.basicConfig(stream=sys.stdout, level=logging.info) + +ARGS = { + 'owner': 'apentori', + 'depends_on_past': False, + 'start_date': datetime(2023,6,1), + 'email': ['alexis@status.im'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=10), +} + + +airbyte_connections = [ + 'extract_spiff_backend_mod_prod', + 'extract_spiff_connector_mod_prod' +] + +@dag('mod-prod-spiff-data-sync', schedule_interval='0 0/8 * * *', default_args=ARGS) +def mod_prod_spiff_dashboard_sync(): + + connections_id=fetch_airbyte_connections_tg(airbyte_connections) + + + fetch_connector_prod_data = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_connector_prod', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['extract_spiff_connector_mod_prod'], + asynchronous=False, + wait_seconds=3 + ) + + fetch_bank_spiff_backend_prod_data = AirbyteTriggerSyncOperator( + task_id='airbyte_fetch_backend_prod', + airbyte_conn_id='airbyte_conn', + connection_id=connections_id['extract_spiff_backend_mod_prod'], + asynchronous=False, + wait_seconds=3 + ) + + dbt_run_prod_spiff = BashOperator( + task_id='dbt_run_models_prod_spiff', + bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select prod_spiff' + ) + + connections_id >> fetch_connector_prod_data >> fetch_bank_spiff_backend_prod_data >> dbt_run_prod_spiff + +mod_prod_spiff_dashboard_sync() diff --git a/treasure_dashboard.py b/treasure_dashboard.py index 33daddc..7f81064 100644 --- a/treasure_dashboard.py +++ b/treasure_dashboard.py @@ -39,7 +39,7 @@ airbyte_connections = [ 'treasure-dsh-fetch-coingecko' ] -@dag('treasure-dashboard-sync', schedule_interval='0 */1 * * *', default_args=ARGS) +@dag('treasure-dashboard-sync', schedule_interval='0 0/6 * * *', default_args=ARGS) def treasure_dashboard_sync(): connections_id=fetch_airbyte_connections_tg(airbyte_connections) @@ -92,7 +92,7 @@ def treasure_dashboard_sync(): dbt_run_blockchain = BashOperator( task_id='dbt_run_models_blockchain', - bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select blochckain' + bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select blockchain' ) dbt_run_finance = BashOperator(