Fixed typo and schedule for Treasure Sync and created Mod Prod Spiff dag sync (#6)

* comm_extraction: fetching data from twitter account

Signed-off-by: Alexis Pentori <alexis@status.im>

* Fixed typo and updated schedule to avoid API rate limits

- Had a typo when declaring the bash operator for blockchain models.

- Changed the schedule to every 6 hours since the API is hitting rate limits for the wallet data.

* Created dag to sync mod prod spiff data and models

---------

Signed-off-by: Alexis Pentori <alexis@status.im>
Co-authored-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
José Anaya 2024-04-23 10:58:07 +02:00 committed by GitHub
parent 99ab33846e
commit 061ecbabb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 141 additions and 7 deletions

View File

@ -36,16 +36,31 @@ ARGS = {
airbyte_connections=[
'discord_fetcher',
'simplecast_fetch'
'simplecast_fetch',
'com_twitter_nomos_tech',
'com_twitter_acid_info',
'com_twitter_codex',
'com_twitter_ethstatus',
'com_twitter_logos',
'com_twitter_nimbus',
'com_twitter_waku',
]
@task(task_id="wait_for_api")
def wait_for_api():
# Twitter API limit number of call each 15 min
# https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets#tab1
time.sleep(900)
@dag(
'comm_extraction',
default_args=ARGS,
# Run every 4 hours
schedule_interval='0 */24 * * * '
)
def forums_sync():
def comm_extraction():
connections_id=fetch_airbyte_connections_tg(airbyte_connections)
# Trigger Airbyte fetch Data from Discourse
@ -63,7 +78,57 @@ def forums_sync():
asynchronous=False,
wait_seconds=3
)
connections_id >> [discord_fetcher, simplecast_fetch]
twitter_acid_info = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_acid_info',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_acid_info'],
asynchronous=False,
wait_seconds=3
)
twitter_nomos_tech = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_nomos_tech',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_nomos_tech'],
asynchronous=False,
wait_seconds=3
)
twitter_codex = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_codex',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_codex'],
asynchronous=False,
wait_seconds=3
)
twitter_logos = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_logos',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_logos'],
asynchronous=False,
wait_seconds=3
)
twitter_ethstatus = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_ethstatus',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_ethstatus'],
asynchronous=False,
wait_seconds=3
)
twitter_nimbus = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_nimbus',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_nimbus'],
asynchronous=False,
wait_seconds=3
)
twitter_waku = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_twitter_waku',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['com_twitter_waku'],
asynchronous=False,
wait_seconds=3
)
forums_sync()
connections_id >> [discord_fetcher, simplecast_fetch] >> twitter_acid_info >> wait_for_api >> twitter_nomos_tech >> wait_for_api >> twitter_codex >> wait_for_api >> twitter_ethstatus >> wait_for_api >> twitter_logos >> wait_for_api >> twitter_waku >> wait_for_api >> twitter_nimbus >> wait_for_api
comm_extraction()

69
dbt_spiff.py Normal file
View File

@ -0,0 +1,69 @@
import sys
import logging
from os import path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.decorators import dag
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.bash_operator import BashOperator
# HACK: Fix for loading relative modules.
sys.path.append(path.dirname(path.realpath(__file__)))
from tasks.airbyte import fetch_airbyte_connections_tg, update_airbyte_source_config_tg
from providers.airbyte.operator import AirbyteTriggerSyncOperator
"""
DAG to sync data for mod prod spiff environment
"""
logging.basicConfig(stream=sys.stdout, level=logging.info)
ARGS = {
'owner': 'apentori',
'depends_on_past': False,
'start_date': datetime(2023,6,1),
'email': ['alexis@status.im'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
}
airbyte_connections = [
'extract_spiff_backend_mod_prod',
'extract_spiff_connector_mod_prod'
]
@dag('mod-prod-spiff-data-sync', schedule_interval='0 0/8 * * *', default_args=ARGS)
def mod_prod_spiff_dashboard_sync():
connections_id=fetch_airbyte_connections_tg(airbyte_connections)
fetch_connector_prod_data = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_connector_prod',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['extract_spiff_connector_mod_prod'],
asynchronous=False,
wait_seconds=3
)
fetch_bank_spiff_backend_prod_data = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_backend_prod',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['extract_spiff_backend_mod_prod'],
asynchronous=False,
wait_seconds=3
)
dbt_run_prod_spiff = BashOperator(
task_id='dbt_run_models_prod_spiff',
bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select prod_spiff'
)
connections_id >> fetch_connector_prod_data >> fetch_bank_spiff_backend_prod_data >> dbt_run_prod_spiff
mod_prod_spiff_dashboard_sync()

View File

@ -39,7 +39,7 @@ airbyte_connections = [
'treasure-dsh-fetch-coingecko'
]
@dag('treasure-dashboard-sync', schedule_interval='0 */1 * * *', default_args=ARGS)
@dag('treasure-dashboard-sync', schedule_interval='0 0/6 * * *', default_args=ARGS)
def treasure_dashboard_sync():
connections_id=fetch_airbyte_connections_tg(airbyte_connections)
@ -92,7 +92,7 @@ def treasure_dashboard_sync():
dbt_run_blockchain = BashOperator(
task_id='dbt_run_models_blockchain',
bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select blochckain'
bash_command='dbt run --profiles-dir /dbt --project-dir /dbt/dbt-models/ --select blockchain'
)
dbt_run_finance = BashOperator(