diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index de103aa9..0b2d73b3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1101,7 +1101,7 @@ class ProcessInstanceProcessor: self._add_bpmn_process_definitions(bpmn_spec_dict) subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent, new_task_models, new_json_data_models = ( + bpmn_process_parent, new_task_models, new_json_data_dicts = ( TaskService.add_bpmn_process( process_instance_data_dict, self.process_instance_model ) @@ -1118,9 +1118,10 @@ class ProcessInstanceProcessor: bpmn_process_guid=subprocess_task_id, ) new_task_models.update(subprocess_new_task_models) - new_json_data_models.update(subprocess_new_json_data_models) + new_json_data_dicts.update(subprocess_new_json_data_models) db.session.bulk_save_objects(new_task_models.values()) - db.session.bulk_save_objects(new_json_data_models.values()) + + TaskService.insert_or_update_json_data_records(new_json_data_dicts) def save(self) -> None: """Saves the current state of this processor to the database.""" @@ -1908,11 +1909,18 @@ class ProcessInstanceProcessor: db.session.add(details_model) # ####### - json_data = TaskService.update_task_model( + json_data_dict = TaskService.update_task_model( task_model, spiff_task, self._serializer ) - if json_data is not None: - db.session.add(json_data) + if json_data_dict is not None: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=json_data_dict["hash"]) + .first() + ) + if json_data is None: + json_data = JsonDataModel(**json_data_dict) + db.session.add(json_data) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 402ff8ee..dbd0a912 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -2,11 +2,15 @@ import json from hashlib import sha256 from typing import Optional from typing import Tuple +from typing import TypedDict +from flask import current_app from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskStateNames +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.dialects.postgresql import insert as postgres_insert from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db @@ -15,25 +19,42 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +class JsonDataDict(TypedDict): + hash: str + data: dict + + class TaskService: @classmethod - def update_task_data_on_task_model( + def insert_or_update_json_data_records( + cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] + ) -> None: + list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()] + if len(list_of_dicts) > 0: + on_duplicate_key_stmt = None + if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": + insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts) + on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( + data=insert_stmt.inserted.data, status="U" + ) + else: + insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts) + on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing( + index_elements=["hash"] + ) + db.session.execute(on_duplicate_key_stmt) + + @classmethod + def _update_task_data_on_task_model( cls, task_model: TaskModel, task_data_dict: dict - ) -> Optional[JsonDataModel]: + ) -> Optional[JsonDataDict]: task_data_json = json.dumps(task_data_dict, sort_keys=True) - task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() - json_data_to_return = None + task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() + json_data_dict: Optional[JsonDataDict] = None if task_model.json_data_hash != task_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=task_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) - json_data_to_return = json_data + json_data_dict = {"hash": task_data_hash, "data": task_data_dict} task_model.json_data_hash = task_data_hash - return json_data_to_return + return json_data_dict @classmethod def update_task_model( @@ -41,7 +62,7 @@ class TaskService: task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer, - ) -> Optional[JsonDataModel]: + ) -> Optional[JsonDataDict]: """Updates properties_json and data on given task_model. This will NOT update start_in_seconds or end_in_seconds. @@ -51,8 +72,10 @@ class TaskService: spiff_task_data = new_properties_json.pop("data") task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] - json_data = cls.update_task_data_on_task_model(task_model, spiff_task_data) - return json_data + json_data_dict = cls._update_task_data_on_task_model( + task_model, spiff_task_data + ) + return json_data_dict @classmethod def find_or_create_task_model_from_spiff_task( @@ -64,7 +87,7 @@ class TaskService: Optional[BpmnProcessModel], TaskModel, dict[str, TaskModel], - dict[str, JsonDataModel], + dict[str, JsonDataDict], ]: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by( @@ -72,9 +95,9 @@ class TaskService: ).first() bpmn_process = None new_task_models: dict[str, TaskModel] = {} - new_json_data_models: dict[str, JsonDataModel] = {} + new_json_data_dicts: dict[str, JsonDataDict] = {} if task_model is None: - bpmn_process, new_task_models, new_json_data_models = cls.task_bpmn_process( + bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( spiff_task, process_instance, serializer ) task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() @@ -82,7 +105,7 @@ class TaskService: task_model = TaskModel( guid=spiff_task_guid, bpmn_process_id=bpmn_process.id ) - return (bpmn_process, task_model, new_task_models, new_json_data_models) + return (bpmn_process, task_model, new_task_models, new_json_data_dicts) @classmethod def task_subprocess( @@ -107,17 +130,17 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, - ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: + ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) bpmn_process: Optional[BpmnProcessModel] = None new_task_models: dict[str, TaskModel] = {} - new_json_data_models: dict[str, JsonDataModel] = {} + new_json_data_dicts: dict[str, JsonDataDict] = {} if subprocess is None: bpmn_process = process_instance.bpmn_process # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None if process_instance.bpmn_process_id is None: - bpmn_process, new_task_models, new_json_data_models = ( + bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( serializer.workflow_to_dict( spiff_task.workflow._get_outermost_workflow() @@ -130,7 +153,7 @@ class TaskService: guid=subprocess_guid ).first() if bpmn_process is None: - bpmn_process, new_task_models, new_json_data_models = ( + bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( serializer.workflow_to_dict(subprocess), process_instance, @@ -138,7 +161,7 @@ class TaskService: subprocess_guid, ) ) - return (bpmn_process, new_task_models, new_json_data_models) + return (bpmn_process, new_task_models, new_json_data_dicts) @classmethod def add_bpmn_process( @@ -147,7 +170,7 @@ class TaskService: process_instance: ProcessInstanceModel, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, - ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: + ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: """This creates and adds a bpmn_process to the Db session. It will also add tasks and relating json_data entries if the bpmn_process is new. @@ -157,7 +180,7 @@ class TaskService: bpmn_process_data_dict = bpmn_process_dict.pop("data") new_task_models = {} - new_json_data_models = {} + new_json_data_dicts: dict[str, JsonDataDict] = {} bpmn_process = None if bpmn_process_parent is not None: @@ -179,16 +202,10 @@ class TaskService: bpmn_process_data_json.encode("utf8") ).hexdigest() if bpmn_process.json_data_hash != bpmn_process_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=bpmn_process_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel( - hash=bpmn_process_data_hash, data=bpmn_process_data_dict - ) - new_json_data_models[bpmn_process_data_hash] = json_data + new_json_data_dicts[bpmn_process_data_hash] = { + "hash": bpmn_process_data_hash, + "data": bpmn_process_data_dict, + } bpmn_process.json_data_hash = bpmn_process_data_hash if bpmn_process_parent is None: @@ -220,11 +237,11 @@ class TaskService: task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties - json_data = TaskService.update_task_data_on_task_model( + json_data_dict = TaskService._update_task_data_on_task_model( task_model, task_data_dict ) new_task_models[task_model.guid] = task_model - if json_data is not None: - new_json_data_models[json_data.hash] = json_data + if json_data_dict is not None: + new_json_data_dicts[json_data_dict["hash"]] = json_data_dict - return (bpmn_process, new_task_models, new_json_data_models) + return (bpmn_process, new_task_models, new_json_data_dicts) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 2f90d5b8..864885e5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -12,7 +12,6 @@ from SpiffWorkflow.task import TaskState from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db -from spiffworkflow_backend.models.json_data import JsonDataModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, @@ -20,6 +19,7 @@ from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.services.task_service import JsonDataDict from spiffworkflow_backend.services.task_service import TaskService @@ -60,7 +60,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.current_task_model: Optional[TaskModel] = None self.task_models: dict[str, TaskModel] = {} - self.json_data_models: dict[str, JsonDataModel] = {} + self.json_data_dicts: dict[str, JsonDataDict] = {} self.serializer = serializer def should_update_task_model(self) -> bool: @@ -72,14 +72,14 @@ class TaskModelSavingDelegate(EngineStepDelegate): def will_complete_task(self, spiff_task: SpiffTask) -> None: if self.should_update_task_model(): - _bpmn_process, task_model, new_task_models, new_json_data_models = ( + _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( spiff_task, self.process_instance, self.serializer ) ) self.current_task_model = task_model self.task_models.update(new_task_models) - self.json_data_models.update(new_json_data_models) + self.json_data_dicts.update(new_json_data_dicts) self.current_task_model.start_in_seconds = time.time() if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.will_complete_task(spiff_task) @@ -87,18 +87,20 @@ class TaskModelSavingDelegate(EngineStepDelegate): def did_complete_task(self, spiff_task: SpiffTask) -> None: if self.current_task_model and self.should_update_task_model(): self.current_task_model.end_in_seconds = time.time() - json_data = TaskService.update_task_model( + json_data_dict = TaskService.update_task_model( self.current_task_model, spiff_task, self.serializer ) - if json_data is not None: - self.json_data_models[json_data.hash] = json_data + if json_data_dict is not None: + self.json_data_dicts[json_data_dict["hash"]] = json_data_dict self.task_models[self.current_task_model.guid] = self.current_task_model if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) def save(self, _commit: bool = True) -> None: db.session.bulk_save_objects(self.task_models.values()) - db.session.bulk_save_objects(self.json_data_models.values()) + + TaskService.insert_or_update_json_data_records(self.json_data_dicts) + if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.save(commit=False) db.session.commit() @@ -113,19 +115,19 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.MAYBE | TaskState.LIKELY ): - _bpmn_process, task_model, new_task_models, new_json_data_models = ( + _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( waiting_spiff_task, self.process_instance, self.serializer ) ) self.task_models.update(new_task_models) - self.json_data_models.update(new_json_data_models) - json_data = TaskService.update_task_model( + self.json_data_dicts.update(new_json_data_dicts) + json_data_dict = TaskService.update_task_model( task_model, waiting_spiff_task, self.serializer ) self.task_models[task_model.guid] = task_model - if json_data is not None: - self.json_data_models[json_data.hash] = json_data + if json_data_dict is not None: + self.json_data_dicts[json_data_dict["hash"]] = json_data_dict class StepDetailLoggingDelegate(EngineStepDelegate):