spiff-arena/spiffworkflow-backend/bin/data_migrations/version_1_3.py

204 lines
9.4 KiB
Python
Raw Normal View History

import copy
import json
import os
import uuid
from hashlib import sha256
from spiffworkflow_backend import create_app
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from sqlalchemy import or_
from sqlalchemy.orm.attributes import flag_modified
class VersionOneThree:
"""Migrates data in the database to be compatible with SpiffWorkflow at git revision ebcdde95.
Converts migration file from SpiffWorkflow to work with backend's db:
https://github.com/sartography/SpiffWorkflow/blob/main/SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
"""
def run(self) -> None:
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "local_development"
if os.environ.get("SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR") is None:
os.environ["SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR"] = "hey"
flask_env_key = "FLASK_SESSION_SECRET_KEY"
os.environ[flask_env_key] = "whatevs"
app = create_app()
with app.app_context():
task_definitions = self.get_relevant_task_definitions()
for task_definition in task_definitions:
self.process_task_definition(task_definition)
relating_task_models = TaskModel.query.filter_by(task_definition_id=task_definition.id).all()
for task_model in relating_task_models:
self.process_task_model(task_model, task_definition)
task_definitions_with_events = TaskDefinitionModel.query.filter(
or_(
TaskDefinitionModel.typename.like("%Event%"), # type: ignore
TaskDefinitionModel.typename.in_(["SendTask", "ReceiveTask"]), # type: ignore
)
).all()
for tdwe in task_definitions_with_events:
self.update_event_definitions(tdwe)
db.session.commit()
def get_relevant_task_definitions(self) -> list[TaskDefinitionModel]:
task_definitions: list[TaskDefinitionModel] = TaskDefinitionModel.query.filter_by(
typename="_BoundaryEventParent"
).all()
return task_definitions
def process_task_definition(self, task_definition: TaskDefinitionModel) -> None:
task_definition.typename = "BoundaryEventSplit"
task_definition.bpmn_identifier = task_definition.bpmn_identifier.replace(
"BoundaryEventParent", "BoundaryEventSplit"
)
properties_json = copy.copy(task_definition.properties_json)
properties_json.pop("main_child_task_spec")
properties_json["typename"] = task_definition.typename
properties_json["name"] = task_definition.bpmn_identifier
task_definition.properties_json = properties_json
flag_modified(task_definition, "properties_json") # type: ignore
db.session.add(task_definition)
join_properties_json = {
"name": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
"manual": False,
"bpmn_id": None,
"lookahead": 2,
"inputs": properties_json["outputs"],
"outputs": [],
"split_task": task_definition.bpmn_identifier,
"threshold": None,
"cancel": True,
"typename": "BoundaryEventJoin",
}
join_task_definition = TaskDefinitionModel(
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
bpmn_identifier=join_properties_json["name"],
typename=join_properties_json["typename"],
properties_json=join_properties_json,
)
db.session.add(join_task_definition)
for parent_bpmn_identifier in properties_json["inputs"]:
parent_task_definition = TaskDefinitionModel.query.filter_by(
bpmn_identifier=parent_bpmn_identifier,
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
).first()
parent_task_definition.properties_json["outputs"] = [
name.replace("BoundaryEventParent", "BoundaryEventSplit")
for name in parent_task_definition.properties_json["outputs"]
]
flag_modified(parent_task_definition, "properties_json") # type: ignore
db.session.add(parent_task_definition)
for child_bpmn_identifier in properties_json["outputs"]:
child_task_definition = TaskDefinitionModel.query.filter_by(
bpmn_identifier=child_bpmn_identifier,
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
).first()
child_task_definition.properties_json["outputs"].append(join_task_definition.bpmn_identifier)
child_task_definition.properties_json["inputs"] = [
name.replace("BoundaryEventParent", "BoundaryEventSplit")
for name in child_task_definition.properties_json["inputs"]
]
flag_modified(child_task_definition, "properties_json") # type: ignore
db.session.add(child_task_definition)
def process_task_model(self, task_model: TaskModel, task_definition: TaskDefinitionModel) -> None:
task_model.properties_json["task_spec"] = task_definition.bpmn_identifier
flag_modified(task_model, "properties_json") # type: ignore
db.session.add(task_model)
child_task_models = []
all_children_completed = True
# Ruff keeps complaining unless it's done like this
blank_json = json.dumps({})
blank_json_data_hash = sha256(blank_json.encode("utf8")).hexdigest()
for child_task_guid in task_model.properties_json["children"]:
child_task_model = TaskModel.query.filter_by(guid=child_task_guid).first()
if child_task_model is None:
continue
if child_task_model.state not in ["COMPLETED", "CANCELLED"]:
all_children_completed = False
child_task_models.append(child_task_model)
for child_task_model in child_task_models:
if child_task_model.state == "CANCELLED":
# Cancelled tasks don't have children
continue
new_task_state = None
start_in_seconds = child_task_model.start_in_seconds
end_in_seconds = None
if child_task_model.state in ["MAYBE", "LIKELY", "FUTURE"]:
new_task_state = child_task_model.state
elif child_task_model.state in ["WAITING", "READY", "STARTED"]:
new_task_state = "FUTURE"
elif child_task_model.state == "COMPLETED":
if all_children_completed:
new_task_state = "COMPLETED"
end_in_seconds = child_task_model.end_in_seconds
else:
new_task_state = "WAITING"
elif child_task_model.state == "ERROR":
new_task_state = "WAITING"
else:
raise Exception(f"Unknown state: {child_task_model.state} for {child_task_model.guild}")
new_task_properties_json = {
"id": str(uuid.uuid4()),
"parent": child_task_model.guid,
"children": [],
"state": Task.task_state_name_to_int(new_task_state),
"task_spec": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
"last_state_change": None,
"triggered": False,
"internal_data": {},
}
new_task_model = TaskModel(
guid=new_task_properties_json["id"],
bpmn_process_id=task_model.bpmn_process_id,
process_instance_id=task_model.process_instance_id,
task_definition_id=task_model.task_definition_id,
state=new_task_state,
properties_json=new_task_properties_json,
start_in_seconds=start_in_seconds,
end_in_seconds=end_in_seconds,
json_data_hash=blank_json_data_hash,
python_env_data_hash=blank_json_data_hash,
)
db.session.add(new_task_model)
child_task_model.properties_json["children"].append(new_task_model.guid)
flag_modified(child_task_model, "properties_json") # type: ignore
db.session.add(child_task_model)
def update_event_definitions(self, task_definition: TaskDefinitionModel) -> None:
if "event_definition" in task_definition.properties_json:
properties_json = copy.copy(task_definition.properties_json)
properties_json["event_definition"].pop("internal", None)
properties_json["event_definition"].pop("external", None)
if "escalation_code" in properties_json["event_definition"]:
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop(
"escalation_code"
)
if "error_code" in properties_json["event_definition"]:
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop("error_code")
task_definition.properties_json = properties_json
flag_modified(task_definition, "properties_json") # type: ignore
db.session.add(task_definition)
VersionOneThree().run()