attempt to support on conflict inserts for postgres as well w/ burnettk

This commit is contained in:
jasquat 2023-03-13 17:35:03 -04:00
parent 1bbe4e68e6
commit a9f8e113cb
No known key found for this signature in database
1 changed files with 29 additions and 13 deletions

View File

@ -1,5 +1,6 @@
import json import json
from hashlib import sha256 from hashlib import sha256
from flask import current_app
from typing import TypedDict from typing import TypedDict
from typing import Optional from typing import Optional
from typing import Tuple from typing import Tuple
@ -15,7 +16,8 @@ from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from sqlalchemy.dialects.mysql import insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert
class JsonDataDict(TypedDict): class JsonDataDict(TypedDict):
@ -28,18 +30,32 @@ class TaskService:
def insert_or_update_json_data_records(cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]) -> None: def insert_or_update_json_data_records(cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]) -> None:
list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()] list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()]
if len(list_of_dicts) > 0: if len(list_of_dicts) > 0:
# db.session.execute(JsonDataModel.__table__.insert(), list_of_dicts) # >>> from sqlalchemy.dialects.postgresql import insert
# >>> insert_stmt = insert(my_table).values(
# insert_stmt = insert(my_table).values( # ... id='some_existing_id',
# id='some_existing_id', # ... data='inserted value')
# data='inserted value') # >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
insert_stmt = insert(JsonDataModel).values(list_of_dicts) # ... index_elements=['id']
# ... )
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( # >>> print(do_nothing_stmt)
data=insert_stmt.inserted.data, # INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
status='U' # ON CONFLICT (id) DO NOTHING
) # >>> do_update_stmt = insert_stmt.on_conflict_do_update(
# ... constraint='pk_my_table',
# ... set_=dict(data='updated value')
# ... )
on_duplicate_key_stmt = None
if current_app.config['SPIFFWORKFLOW_BACKEND_DATABASE_TYPE'] == "mysql":
insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
else:
insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=['hash']
)
db.session.execute(on_duplicate_key_stmt) db.session.execute(on_duplicate_key_stmt)
@classmethod @classmethod