mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-02-11 17:07:18 +00:00
204 lines
9.4 KiB
Python
204 lines
9.4 KiB
Python
import copy
|
|
import json
|
|
import os
|
|
import uuid
|
|
from hashlib import sha256
|
|
|
|
from spiffworkflow_backend import create_app
|
|
from spiffworkflow_backend.models.db import db
|
|
from spiffworkflow_backend.models.task import Task
|
|
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
|
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
|
|
|
|
class VersionOneThree:
|
|
"""Migrates data in the database to be compatible with SpiffWorkflow at git revision ebcdde95.
|
|
|
|
Converts migration file from SpiffWorkflow to work with backend's db:
|
|
https://github.com/sartography/SpiffWorkflow/blob/main/SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
|
|
"""
|
|
def run(self) -> None:
|
|
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "local_development"
|
|
if os.environ.get("SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR") is None:
|
|
os.environ["SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR"] = "hey"
|
|
flask_env_key = "FLASK_SESSION_SECRET_KEY"
|
|
os.environ[flask_env_key] = "whatevs"
|
|
app = create_app()
|
|
with app.app_context():
|
|
task_definitions = self.get_relevant_task_definitions()
|
|
for task_definition in task_definitions:
|
|
self.process_task_definition(task_definition)
|
|
relating_task_models = TaskModel.query.filter_by(task_definition_id=task_definition.id).all()
|
|
for task_model in relating_task_models:
|
|
self.process_task_model(task_model, task_definition)
|
|
|
|
task_definitions_with_events = TaskDefinitionModel.query.filter(
|
|
or_(
|
|
TaskDefinitionModel.typename.like("%Event%"), # type: ignore
|
|
TaskDefinitionModel.typename.in_(["SendTask", "ReceiveTask"]), # type: ignore
|
|
)
|
|
).all()
|
|
for tdwe in task_definitions_with_events:
|
|
self.update_event_definitions(tdwe)
|
|
|
|
db.session.commit()
|
|
|
|
def get_relevant_task_definitions(self) -> list[TaskDefinitionModel]:
|
|
task_definitions: list[TaskDefinitionModel] = TaskDefinitionModel.query.filter_by(
|
|
typename="_BoundaryEventParent"
|
|
).all()
|
|
return task_definitions
|
|
|
|
def process_task_definition(self, task_definition: TaskDefinitionModel) -> None:
|
|
task_definition.typename = "BoundaryEventSplit"
|
|
task_definition.bpmn_identifier = task_definition.bpmn_identifier.replace(
|
|
"BoundaryEventParent", "BoundaryEventSplit"
|
|
)
|
|
|
|
properties_json = copy.copy(task_definition.properties_json)
|
|
properties_json.pop("main_child_task_spec")
|
|
properties_json["typename"] = task_definition.typename
|
|
properties_json["name"] = task_definition.bpmn_identifier
|
|
task_definition.properties_json = properties_json
|
|
flag_modified(task_definition, "properties_json") # type: ignore
|
|
db.session.add(task_definition)
|
|
|
|
join_properties_json = {
|
|
"name": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
|
|
"manual": False,
|
|
"bpmn_id": None,
|
|
"lookahead": 2,
|
|
"inputs": properties_json["outputs"],
|
|
"outputs": [],
|
|
"split_task": task_definition.bpmn_identifier,
|
|
"threshold": None,
|
|
"cancel": True,
|
|
"typename": "BoundaryEventJoin",
|
|
}
|
|
|
|
join_task_definition = TaskDefinitionModel(
|
|
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
|
bpmn_identifier=join_properties_json["name"],
|
|
typename=join_properties_json["typename"],
|
|
properties_json=join_properties_json,
|
|
)
|
|
db.session.add(join_task_definition)
|
|
|
|
for parent_bpmn_identifier in properties_json["inputs"]:
|
|
parent_task_definition = TaskDefinitionModel.query.filter_by(
|
|
bpmn_identifier=parent_bpmn_identifier,
|
|
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
|
).first()
|
|
parent_task_definition.properties_json["outputs"] = [
|
|
name.replace("BoundaryEventParent", "BoundaryEventSplit")
|
|
for name in parent_task_definition.properties_json["outputs"]
|
|
]
|
|
flag_modified(parent_task_definition, "properties_json") # type: ignore
|
|
db.session.add(parent_task_definition)
|
|
|
|
for child_bpmn_identifier in properties_json["outputs"]:
|
|
child_task_definition = TaskDefinitionModel.query.filter_by(
|
|
bpmn_identifier=child_bpmn_identifier,
|
|
bpmn_process_definition_id=task_definition.bpmn_process_definition_id,
|
|
).first()
|
|
child_task_definition.properties_json["outputs"].append(join_task_definition.bpmn_identifier)
|
|
child_task_definition.properties_json["inputs"] = [
|
|
name.replace("BoundaryEventParent", "BoundaryEventSplit")
|
|
for name in child_task_definition.properties_json["inputs"]
|
|
]
|
|
flag_modified(child_task_definition, "properties_json") # type: ignore
|
|
db.session.add(child_task_definition)
|
|
|
|
def process_task_model(self, task_model: TaskModel, task_definition: TaskDefinitionModel) -> None:
|
|
task_model.properties_json["task_spec"] = task_definition.bpmn_identifier
|
|
flag_modified(task_model, "properties_json") # type: ignore
|
|
db.session.add(task_model)
|
|
|
|
child_task_models = []
|
|
all_children_completed = True
|
|
|
|
# Ruff keeps complaining unless it's done like this
|
|
blank_json = json.dumps({})
|
|
blank_json_data_hash = sha256(blank_json.encode("utf8")).hexdigest()
|
|
|
|
for child_task_guid in task_model.properties_json["children"]:
|
|
child_task_model = TaskModel.query.filter_by(guid=child_task_guid).first()
|
|
if child_task_model is None:
|
|
continue
|
|
if child_task_model.state not in ["COMPLETED", "CANCELLED"]:
|
|
all_children_completed = False
|
|
child_task_models.append(child_task_model)
|
|
|
|
for child_task_model in child_task_models:
|
|
if child_task_model.state == "CANCELLED":
|
|
# Cancelled tasks don't have children
|
|
continue
|
|
|
|
new_task_state = None
|
|
start_in_seconds = child_task_model.start_in_seconds
|
|
end_in_seconds = None
|
|
|
|
if child_task_model.state in ["MAYBE", "LIKELY", "FUTURE"]:
|
|
new_task_state = child_task_model.state
|
|
elif child_task_model.state in ["WAITING", "READY", "STARTED"]:
|
|
new_task_state = "FUTURE"
|
|
elif child_task_model.state == "COMPLETED":
|
|
if all_children_completed:
|
|
new_task_state = "COMPLETED"
|
|
end_in_seconds = child_task_model.end_in_seconds
|
|
else:
|
|
new_task_state = "WAITING"
|
|
elif child_task_model.state == "ERROR":
|
|
new_task_state = "WAITING"
|
|
else:
|
|
raise Exception(f"Unknown state: {child_task_model.state} for {child_task_model.guild}")
|
|
|
|
new_task_properties_json = {
|
|
"id": str(uuid.uuid4()),
|
|
"parent": child_task_model.guid,
|
|
"children": [],
|
|
"state": Task.task_state_name_to_int(new_task_state),
|
|
"task_spec": task_definition.bpmn_identifier.replace("BoundaryEventSplit", "BoundaryEventJoin"),
|
|
"last_state_change": None,
|
|
"triggered": False,
|
|
"internal_data": {},
|
|
}
|
|
|
|
new_task_model = TaskModel(
|
|
guid=new_task_properties_json["id"],
|
|
bpmn_process_id=task_model.bpmn_process_id,
|
|
process_instance_id=task_model.process_instance_id,
|
|
task_definition_id=task_model.task_definition_id,
|
|
state=new_task_state,
|
|
properties_json=new_task_properties_json,
|
|
start_in_seconds=start_in_seconds,
|
|
end_in_seconds=end_in_seconds,
|
|
json_data_hash=blank_json_data_hash,
|
|
python_env_data_hash=blank_json_data_hash,
|
|
)
|
|
db.session.add(new_task_model)
|
|
|
|
child_task_model.properties_json["children"].append(new_task_model.guid)
|
|
flag_modified(child_task_model, "properties_json") # type: ignore
|
|
db.session.add(child_task_model)
|
|
|
|
def update_event_definitions(self, task_definition: TaskDefinitionModel) -> None:
|
|
if "event_definition" in task_definition.properties_json:
|
|
properties_json = copy.copy(task_definition.properties_json)
|
|
properties_json["event_definition"].pop("internal", None)
|
|
properties_json["event_definition"].pop("external", None)
|
|
if "escalation_code" in properties_json["event_definition"]:
|
|
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop(
|
|
"escalation_code"
|
|
)
|
|
if "error_code" in properties_json["event_definition"]:
|
|
properties_json["event_definition"]["code"] = properties_json["event_definition"].pop("error_code")
|
|
task_definition.properties_json = properties_json
|
|
flag_modified(task_definition, "properties_json") # type: ignore
|
|
db.session.add(task_definition)
|
|
|
|
|
|
VersionOneThree().run()
|