diff --git a/.flake8 b/.flake8 index 9c54dc0e7..6e5fa533b 100644 --- a/.flake8 +++ b/.flake8 @@ -2,6 +2,7 @@ select = B,B9,C,D,DAR,E,F,N,RST,S,W ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320 max-line-length = 120 +extend-ignore = E203 max-complexity = 30 docstring-convention = google rst-roles = class,const,func,meth,mod,ref diff --git a/spiffworkflow-backend/.flake8 b/spiffworkflow-backend/.flake8 index 2e6554e58..d73f1dba4 100644 --- a/spiffworkflow-backend/.flake8 +++ b/spiffworkflow-backend/.flake8 @@ -2,6 +2,7 @@ select = B,B9,C,D,DAR,E,F,N,RST,S,W ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320 max-line-length = 120 +extend-ignore = E203 max-complexity = 30 docstring-convention = google rst-roles = class,const,func,meth,mod,ref diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index dec1e993d..589940d3f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1084,16 +1084,20 @@ class ProcessInstanceProcessor: self._add_bpmn_process_definitions(bpmn_spec_dict) subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = TaskService.add_bpmn_process( + bpmn_process_parent, new_task_models, new_json_data_models = TaskService.add_bpmn_process( process_instance_data_dict, self.process_instance_model ) for subprocess_task_id, subprocess_properties in subprocesses.items(): - TaskService.add_bpmn_process( + _bpmn_subprocess, subprocess_new_task_models, subprocess_new_json_data_models = TaskService.add_bpmn_process( subprocess_properties, self.process_instance_model, bpmn_process_parent, bpmn_process_guid=subprocess_task_id, ) + new_task_models.update(subprocess_new_task_models) + new_json_data_models.update(subprocess_new_json_data_models) + db.session.bulk_save_objects(new_task_models.values()) + db.session.bulk_save_objects(new_json_data_models.values()) def save(self) -> None: """Saves the current state of this processor to the database.""" @@ -1881,9 +1885,11 @@ class ProcessInstanceProcessor: db.session.add(details_model) # ####### - TaskService.update_task_model_and_add_to_db_session( + json_data = TaskService.update_task_model_and_add_to_db_session( task_model, spiff_task, self._serializer ) + if json_data is not None: + db.session.add(json_data) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 775e29099..4495ba713 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -19,9 +19,10 @@ class TaskService: @classmethod def update_task_data_on_task_model( cls, task_model: TaskModel, task_data_dict: dict - ) -> None: + ) -> Optional[JsonDataModel]: task_data_json = json.dumps(task_data_dict, sort_keys=True) task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() + json_data_to_return = None if task_model.json_data_hash != task_data_hash: json_data = ( db.session.query(JsonDataModel.id) @@ -30,8 +31,9 @@ class TaskService: ) if json_data is None: json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) - db.session.add(json_data) + json_data_to_return = json_data task_model.json_data_hash = task_data_hash + return json_data_to_return @classmethod def update_task_model_and_add_to_db_session( @@ -39,7 +41,7 @@ class TaskService: task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer, - ) -> None: + ) -> Optional[JsonDataModel]: """Updates properties_json and data on given task_model. This will NOT update start_in_seconds or end_in_seconds. @@ -48,8 +50,8 @@ class TaskService: spiff_task_data = new_properties_json.pop("data") task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] - cls.update_task_data_on_task_model(task_model, spiff_task_data) - db.session.add(task_model) + json_data = cls.update_task_data_on_task_model(task_model, spiff_task_data) + return json_data @classmethod def find_or_create_task_model_from_spiff_task( @@ -57,13 +59,16 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, - ) -> TaskModel: + ) -> Tuple[Optional[BpmnProcessModel], TaskModel, dict[str, TaskModel], dict[str, JsonDataModel]]: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by( guid=spiff_task_guid ).first() + bpmn_process = None + new_task_models: dict[str, TaskModel] = {} + new_json_data_models: dict[str, JsonDataModel] = {} if task_model is None: - bpmn_process = cls.task_bpmn_process( + bpmn_process, new_task_models, new_json_data_models = cls.task_bpmn_process( spiff_task, process_instance, serializer ) task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() @@ -71,7 +76,7 @@ class TaskService: task_model = TaskModel( guid=spiff_task_guid, bpmn_process_id=bpmn_process.id ) - return task_model + return (bpmn_process, task_model, new_task_models, new_json_data_models) @classmethod def task_subprocess( @@ -96,34 +101,34 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, - ) -> BpmnProcessModel: + ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) bpmn_process: Optional[BpmnProcessModel] = None + new_task_models: dict[str, TaskModel] = {} + new_json_data_models: dict[str, JsonDataModel] = {} if subprocess is None: bpmn_process = process_instance.bpmn_process # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None if process_instance.bpmn_process_id is None: - bpmn_process = cls.add_bpmn_process( + bpmn_process, new_task_models, new_json_data_models = cls.add_bpmn_process( serializer.workflow_to_dict( spiff_task.workflow._get_outermost_workflow() ), process_instance, ) - db.session.commit() else: bpmn_process = BpmnProcessModel.query.filter_by( guid=subprocess_guid ).first() if bpmn_process is None: - bpmn_process = cls.add_bpmn_process( + bpmn_process, new_task_models, new_json_data_models = cls.add_bpmn_process( serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid, ) - db.session.commit() - return bpmn_process + return (bpmn_process, new_task_models, new_json_data_models) @classmethod def add_bpmn_process( @@ -132,7 +137,7 @@ class TaskService: process_instance: ProcessInstanceModel, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, - ) -> BpmnProcessModel: + ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: tasks = bpmn_process_dict.pop("tasks") bpmn_process_data_dict = bpmn_process_dict.pop("data") @@ -174,6 +179,8 @@ class TaskService: bpmn_process.parent_process_id = bpmn_process_parent.id db.session.add(bpmn_process) + new_task_models = {} + new_json_data_models = {} if bpmn_process_is_new: for task_id, task_properties in tasks.items(): task_data_dict = task_properties.pop("data") @@ -194,7 +201,9 @@ class TaskService: task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties - TaskService.update_task_data_on_task_model(task_model, task_data_dict) - db.session.add(task_model) + json_data = TaskService.update_task_data_on_task_model(task_model, task_data_dict) + new_task_models[task_model.guid] = task_model + if json_data is not None: + new_json_data_models[json_data.hash] = json_data - return bpmn_process + return (bpmn_process, new_task_models, new_json_data_models) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 1d12b9765..f9e78bd5f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -12,6 +12,7 @@ from SpiffWorkflow.task import TaskState from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.json_data import JsonDataModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, @@ -58,6 +59,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.process_instance = process_instance self.current_task_model: Optional[TaskModel] = None + self.task_models: dict[str, TaskModel] = {} + self.json_data_models: dict[str, JsonDataModel] = {} self.serializer = serializer def should_update_task_model(self) -> bool: @@ -66,15 +69,15 @@ class TaskModelSavingDelegate(EngineStepDelegate): Use the bpmn_process_id to do this. """ return self.process_instance.bpmn_process_id is not None - # return False def will_complete_task(self, spiff_task: SpiffTask) -> None: if self.should_update_task_model(): - self.current_task_model = ( - TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, self.process_instance, self.serializer - ) + _bpmn_process, task_model, new_task_models, new_json_data_models = TaskService.find_or_create_task_model_from_spiff_task( + spiff_task, self.process_instance, self.serializer ) + self.current_task_model = task_model + self.task_models.update(new_task_models) + self.json_data_models.update(new_json_data_models) self.current_task_model.start_in_seconds = time.time() if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.will_complete_task(spiff_task) @@ -82,14 +85,18 @@ class TaskModelSavingDelegate(EngineStepDelegate): def did_complete_task(self, spiff_task: SpiffTask) -> None: if self.current_task_model and self.should_update_task_model(): self.current_task_model.end_in_seconds = time.time() - TaskService.update_task_model_and_add_to_db_session( + json_data = TaskService.update_task_model_and_add_to_db_session( self.current_task_model, spiff_task, self.serializer ) - db.session.add(self.current_task_model) + if json_data is not None: + self.json_data_models[json_data.hash] = json_data + self.task_models[self.current_task_model.guid] = self.current_task_model if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) def save(self, _commit: bool = True) -> None: + db.session.bulk_save_objects(self.task_models.values()) + db.session.bulk_save_objects(self.json_data_models.values()) if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.save(commit=False) db.session.commit() @@ -104,17 +111,17 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.MAYBE | TaskState.LIKELY ): - task_model = TaskModel.query.filter_by( - guid=str(waiting_spiff_task.id) - ).first() - if task_model is None: - task_model = TaskService.find_or_create_task_model_from_spiff_task( - waiting_spiff_task, self.process_instance, self.serializer - ) - TaskService.update_task_model_and_add_to_db_session( + _bpmn_process, task_model, new_task_models, new_json_data_models = TaskService.find_or_create_task_model_from_spiff_task( + waiting_spiff_task, self.process_instance, self.serializer + ) + self.task_models.update(new_task_models) + self.json_data_models.update(new_json_data_models) + json_data = TaskService.update_task_model_and_add_to_db_session( task_model, waiting_spiff_task, self.serializer ) - db.session.commit() + self.task_models[task_model.guid] = task_model + if json_data is not None: + self.json_data_models[json_data.hash] = json_data class StepDetailLoggingDelegate(EngineStepDelegate):