treasure_dashboard: adding bank balance fetching and renaming airbyte connection

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2024-01-24 11:33:52 +01:00
parent d03bbb2a45
commit 8722b38d82
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
1 changed files with 15 additions and 4 deletions

View File

@ -32,7 +32,11 @@ ARGS = {
}
airbyte_connections=['blockchain-wallet-sync', 'blockchain_market_extraction']
airbyte_connections = [
'treasure-dsh-fetch-wallets-balance',
'treasure-dsh-fetch-blockchain-market',
'treasure-dsh-fetch-bank-balance'
]
@dag('treasure-dashboard-sync', schedule_interval='@daily', default_args=ARGS)
def treasure_dashboard_sync():
@ -53,7 +57,7 @@ def treasure_dashboard_sync():
fetch_wallet_data = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_blockchain_wallet',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['blockchain-wallet-sync'],
connection_id=connections_id['treasure-dsh-fetch-wallets-balance'],
asynchronous=False,
wait_seconds=3
)
@ -61,11 +65,18 @@ def treasure_dashboard_sync():
fetch_market_data = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_blockchain_market',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['blockchain_market_extraction'],
connection_id=connections_id['treasure-dsh-fetch-blockchain-market'],
asynchronous=False,
wait_seconds=3
)
connections_id >> wallets_config >> update_airbyte_config >> fetch_wallet_data >> fetch_market_data
fetch_bank_balance_data = AirbyteTriggerSyncOperator(
task_id='airbyte_fetch_bank_balance',
airbyte_conn_id='airbyte_conn',
connection_id=connections_id['treasure-dsh-fetch-bank-balance'],
asynchronous=False,
wait_seconds=3
)
connections_id >> wallets_config >> update_airbyte_config >> fetch_wallet_data >> fetch_market_data >> fetch_bank_balance_data
treasure_dashboard_sync()