protect future_task foreign key add operation from running when bad data (#1770)

Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
Kevin Burnett 2024-06-19 15:38:01 +00:00 committed by GitHub
parent 7a18be179c
commit 15628278d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 37 additions and 5 deletions

View File

@ -7,6 +7,8 @@ Create Date: 2024-06-18 16:45:06.102210
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
from spiffworkflow_backend.models.db import dialect_name
# revision identifiers, used by Alembic.
@ -15,16 +17,46 @@ down_revision = '43afc70a7016'
branch_labels = None
depends_on = None
def delete_orphaned_future_tasks() -> None:
# Ensure database connection
conn = op.get_bind()
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
dialect = dialect_name()
if dialect == "mysql":
delete_query = text("""
DELETE future_task FROM future_task
LEFT JOIN task ON future_task.guid = task.guid
WHERE task.guid IS NULL;
""")
elif dialect == "postgresql":
delete_query = text("""
DELETE FROM future_task
USING task
WHERE future_task.guid = task.guid
AND task.guid IS NULL;
""")
elif dialect == "sqlite":
delete_query = text("""
DELETE FROM future_task
WHERE guid NOT IN (
SELECT guid FROM task
);
""")
else:
raise ValueError(f"Unsupported database dialect: {dialect}")
# Execute the delete query
conn.execute(delete_query)
def upgrade() -> None:
delete_orphaned_future_tasks()
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.create_foreign_key('future_task_task_guid_fk', 'task', ['guid'], ['guid'], ondelete='CASCADE')
# ### end Alembic commands ###
def downgrade():
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.drop_constraint('future_task_task_guid_fk', type_='foreignkey')