From 6d5c03a3d0880b08ba8c0f66ddbd6af964c1367c Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 10 Mar 2023 12:23:27 -0500 Subject: [PATCH] moved add_bpmn_process to task_service w/ burnettk --- .../services/process_instance_processor.py | 78 +----------------- .../services/task_service.py | 82 +++++++++++++++++-- .../services/workflow_execution_service.py | 4 +- 3 files changed, 81 insertions(+), 83 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index cdfa44ce4..a68d7d09f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1042,78 +1042,6 @@ class ProcessInstanceProcessor: bpmn_process_definition_parent ) - def _add_bpmn_process( - self, - bpmn_process_dict: dict, - bpmn_process_parent: Optional[BpmnProcessModel] = None, - bpmn_process_guid: Optional[str] = None, - add_tasks_if_new_bpmn_process: bool = True, - ) -> BpmnProcessModel: - tasks = bpmn_process_dict.pop("tasks") - bpmn_process_data = bpmn_process_dict.pop("data") - - bpmn_process = None - if bpmn_process_parent is not None: - bpmn_process = BpmnProcessModel.query.filter_by( - parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid - ).first() - elif self.process_instance_model.bpmn_process_id is not None: - bpmn_process = self.process_instance_model.bpmn_process - - bpmn_process_is_new = False - if bpmn_process is None: - bpmn_process_is_new = True - bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) - - bpmn_process.properties_json = bpmn_process_dict - - bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode( - "utf8" - ) - bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() - if bpmn_process.json_data_hash != bpmn_process_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=bpmn_process_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel( - hash=bpmn_process_data_hash, data=bpmn_process_data - ) - db.session.add(json_data) - bpmn_process.json_data_hash = bpmn_process_data_hash - - if bpmn_process_parent is None: - self.process_instance_model.bpmn_process = bpmn_process - elif bpmn_process.parent_process_id is None: - bpmn_process.parent_process_id = bpmn_process_parent.id - db.session.add(bpmn_process) - - if bpmn_process_is_new and add_tasks_if_new_bpmn_process: - # if True: - for task_id, task_properties in tasks.items(): - task_data_dict = task_properties.pop("data") - state_int = task_properties["state"] - - task_model = TaskModel.query.filter_by(guid=task_id).first() - if task_model is None: - # bpmn_process_identifier = task_properties['workflow_name'] - # bpmn_identifier = task_properties['task_spec'] - # - # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) - # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() - # if task_definition is None: - # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) - task_model.state = TaskStateNames[state_int] - task_model.properties_json = task_properties - - TaskService.update_task_data_on_task_model(task_model, task_data_dict) - db.session.add(task_model) - - return bpmn_process - def _add_bpmn_json_records(self) -> None: """Adds serialized_bpmn_definition and process_instance_data records to the db session. @@ -1139,12 +1067,13 @@ class ProcessInstanceProcessor: # import pdb; pdb.set_trace() # if self.process_instance_model.bpmn_process_id is None: subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) + bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model) for subprocess_task_id, subprocess_properties in subprocesses.items(): # import pdb; pdb.set_trace() print(f"subprocess_task_id: {subprocess_task_id}") - self._add_bpmn_process( + TaskService.add_bpmn_process( subprocess_properties, + self.process_instance_model, bpmn_process_parent, bpmn_process_guid=subprocess_task_id, ) @@ -1700,7 +1629,6 @@ class ProcessInstanceProcessor: secondary_engine_step_delegate=step_delegate, serializer=self._serializer, process_instance=self.process_instance_model, - add_bpmn_process=self._add_bpmn_process, ) execution_strategy = execution_strategy_named( execution_strategy_name, task_model_delegate diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 700f29ca0..763dd1b68 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -53,14 +53,14 @@ class TaskService: @classmethod def find_or_create_task_model_from_spiff_task( cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer, add_bpmn_process: Any + serializer: BpmnWorkflowSerializer ) -> TaskModel: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by( guid=spiff_task_guid ).first() if task_model is None: - bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer, add_bpmn_process) + bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer) task_model = TaskModel.query.filter_by( guid=spiff_task_guid ).first() @@ -72,7 +72,7 @@ class TaskService: return task_model @classmethod - def task_subprocess(cls, spiff_task: SpiffTask) -> Optional[Tuple[str, BpmnWorkflow]]: + def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: top_level_workflow = spiff_task.workflow._get_outermost_workflow() my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of my_sp = None @@ -89,7 +89,7 @@ class TaskService: @classmethod def task_bpmn_process( cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer, add_bpmn_process: Any + serializer: BpmnWorkflowSerializer ) -> BpmnProcessModel: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) if subprocess is None: @@ -102,10 +102,82 @@ class TaskService: ).first() # import pdb; pdb.set_trace() if bpmn_process is None: - bpmn_process = add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance.bpmn_process, subprocess_guid, add_tasks_if_new_bpmn_process=True) + bpmn_process = cls.add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid) db.session.commit() # spiff_task_guid = str(spiff_task.id) # raise Exception( # f"Could not find bpmn_process for task {spiff_task_guid}" # ) return bpmn_process + + @classmethod + def add_bpmn_process( + cls, + bpmn_process_dict: dict, + process_instance: ProcessInstanceModel, + bpmn_process_parent: Optional[BpmnProcessModel] = None, + bpmn_process_guid: Optional[str] = None, + ) -> BpmnProcessModel: + tasks = bpmn_process_dict.pop("tasks") + bpmn_process_data = bpmn_process_dict.pop("data") + + bpmn_process = None + if bpmn_process_parent is not None: + bpmn_process = BpmnProcessModel.query.filter_by( + parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid + ).first() + elif process_instance.bpmn_process_id is not None: + bpmn_process = process_instance.bpmn_process + + bpmn_process_is_new = False + if bpmn_process is None: + bpmn_process_is_new = True + bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) + + bpmn_process.properties_json = bpmn_process_dict + + bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode( + "utf8" + ) + bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() + if bpmn_process.json_data_hash != bpmn_process_data_hash: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=bpmn_process_data_hash) + .first() + ) + if json_data is None: + json_data = JsonDataModel( + hash=bpmn_process_data_hash, data=bpmn_process_data + ) + db.session.add(json_data) + bpmn_process.json_data_hash = bpmn_process_data_hash + + if bpmn_process_parent is None: + process_instance.bpmn_process = bpmn_process + elif bpmn_process.parent_process_id is None: + bpmn_process.parent_process_id = bpmn_process_parent.id + db.session.add(bpmn_process) + + if bpmn_process_is_new: + for task_id, task_properties in tasks.items(): + task_data_dict = task_properties.pop("data") + state_int = task_properties["state"] + + task_model = TaskModel.query.filter_by(guid=task_id).first() + if task_model is None: + # bpmn_process_identifier = task_properties['workflow_name'] + # bpmn_identifier = task_properties['task_spec'] + # + # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) + # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() + # if task_definition is None: + # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) + task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) + task_model.state = TaskStateNames[state_int] + task_model.properties_json = task_properties + + TaskService.update_task_data_on_task_model(task_model, task_data_dict) + db.session.add(task_model) + + return bpmn_process diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 04cac19fb..37b289c0a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -51,14 +51,12 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, - add_bpmn_process: Any = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance self.current_task_model: Optional[TaskModel] = None self.serializer = serializer - self.add_bpmn_process = add_bpmn_process def should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. @@ -72,7 +70,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): if self.should_update_task_model(): self.current_task_model = ( TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, self.process_instance, self.serializer, self.add_bpmn_process + spiff_task, self.process_instance, self.serializer ) ) self.current_task_model.start_in_seconds = time.time()