diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 12ae8852..45d27509 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -100,7 +100,6 @@ from spiffworkflow_backend.services.process_instance_tmp_service import ProcessI from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.spec_file_service import SpecFileService -from spiffworkflow_backend.services.task_service import JsonDataDict from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named @@ -1164,38 +1163,34 @@ class ProcessInstanceProcessor: task.complete() spiff_tasks_updated[task.id] = task + task_service = TaskService( + process_instance=self.process_instance_model, + serializer=self._serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + ) for updated_spiff_task in spiff_tasks_updated.values(): ( bpmn_process, task_model, - new_task_models, - new_json_data_dicts, - ) = TaskService.find_or_create_task_model_from_spiff_task( + ) = task_service.find_or_create_task_model_from_spiff_task( updated_spiff_task, - self.process_instance_model, - self._serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) bpmn_process_to_use = bpmn_process or task_model.bpmn_process bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process( bpmn_process_to_use, updated_spiff_task.workflow.data ) db.session.add(bpmn_process_to_use) - json_data_dict_list = TaskService.update_task_model(task_model, updated_spiff_task, self._serializer) - for json_data_dict in json_data_dict_list: - if json_data_dict is not None: - new_json_data_dicts[json_data_dict["hash"]] = json_data_dict + task_service.update_task_model(task_model, updated_spiff_task) if bpmn_process_json_data is not None: - new_json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data + task_service.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data # spiff_task should be the main task we are completing and only it should get the timestamps if task_model.guid == str(spiff_task.id): task_model.start_in_seconds = start_in_seconds task_model.end_in_seconds = end_in_seconds - new_task_models[task_model.guid] = task_model - db.session.bulk_save_objects(new_task_models.values()) - TaskService.insert_or_update_json_data_records(new_json_data_dicts) + task_service.task_models[task_model.guid] = task_model + task_service.save_objects_to_database() ProcessInstanceTmpService.add_event_to_process_instance( self.process_instance_model, event_type, task_guid=task_id @@ -1736,10 +1731,13 @@ class ProcessInstanceProcessor: human_task.task_status = spiff_task.get_state_name() db.session.add(human_task) - json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self._serializer) - json_data_dict_mapping: dict[str, JsonDataDict] = {} - TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping) - TaskService.insert_or_update_json_data_records(json_data_dict_mapping) + task_service = TaskService( + process_instance=self.process_instance_model, + serializer=self._serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + ) + task_service.update_task_model(task_model, spiff_task) + TaskService.insert_or_update_json_data_records(task_service.json_data_dicts) ProcessInstanceTmpService.add_event_to_process_instance( self.process_instance_model, @@ -1747,12 +1745,6 @@ class ProcessInstanceProcessor: task_guid=task_model.guid, user_id=user.id, ) - - task_service = TaskService( - process_instance=self.process_instance_model, - serializer=self._serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - ) task_service.process_parents_and_children_and_save_to_database(spiff_task) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index e4e524bd..4ecd8f7f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -164,7 +164,7 @@ class TaskService: This will also process that subprocess task's children and will recurse upwards to process its parent subprocesses as well. """ - (parent_subprocess_guid, _parent_subprocess) = self.__class__.task_subprocess(spiff_task) + (parent_subprocess_guid, _parent_subprocess) = self.__class__._task_subprocess(spiff_task) if parent_subprocess_guid is not None: spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( UUID(parent_subprocess_guid) @@ -193,16 +193,9 @@ class TaskService: ( new_bpmn_process, task_model, - new_task_models, - new_json_data_dicts, - ) = self.__class__.find_or_create_task_model_from_spiff_task( + ) = self.find_or_create_task_model_from_spiff_task( spiff_task, - self.process_instance, - self.serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) - self.task_models.update(new_task_models) - self.json_data_dicts.update(new_json_data_dicts) # we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value bpmn_process = ( @@ -211,14 +204,13 @@ class TaskService: or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first() ) + self.update_task_model(task_model, spiff_task) bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process( bpmn_process, spiff_task.workflow.data ) - json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer) - self.task_models[task_model.guid] = task_model if bpmn_process_json_data is not None: - json_data_dict_list.append(bpmn_process_json_data) - self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts) + self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data + self.task_models[task_model.guid] = task_model if start_and_end_times: task_model.start_in_seconds = start_and_end_times["start_in_seconds"] @@ -264,147 +256,93 @@ class TaskService: ).first() self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process) - @classmethod - def insert_or_update_json_data_records( - cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] - ) -> None: - list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()] - if len(list_of_dicts) > 0: - on_duplicate_key_stmt = None - if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": - insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts) - on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(data=insert_stmt.inserted.data) - else: - insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts) - on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["hash"]) - db.session.execute(on_duplicate_key_stmt) - - @classmethod def update_task_model( - cls, + self, task_model: TaskModel, spiff_task: SpiffTask, - serializer: BpmnWorkflowSerializer, - ) -> list[Optional[JsonDataDict]]: + ) -> None: """Updates properties_json and data on given task_model. This will NOT update start_in_seconds or end_in_seconds. It also returns the relating json_data object so they can be imported later. """ - new_properties_json = serializer.task_to_dict(spiff_task) + new_properties_json = self.serializer.task_to_dict(spiff_task) if new_properties_json["task_spec"] == "Start": new_properties_json["parent"] = None spiff_task_data = new_properties_json.pop("data") - python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task, serializer) + python_env_data_dict = self.__class__._get_python_env_data_dict_from_spiff_task(spiff_task, self.serializer) task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] - json_data_dict = cls.update_task_data_on_task_model_and_return_dict_if_updated( + json_data_dict = self.__class__.update_task_data_on_task_model_and_return_dict_if_updated( task_model, spiff_task_data, "json_data_hash" ) - python_env_dict = cls.update_task_data_on_task_model_and_return_dict_if_updated( + python_env_dict = self.__class__.update_task_data_on_task_model_and_return_dict_if_updated( task_model, python_env_data_dict, "python_env_data_hash" ) - return [json_data_dict, python_env_dict] + if json_data_dict is not None: + self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + if python_env_dict is not None: + self.json_data_dicts[python_env_dict["hash"]] = python_env_dict - @classmethod def find_or_create_task_model_from_spiff_task( - cls, + self, spiff_task: SpiffTask, - process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer, - bpmn_definition_to_task_definitions_mappings: dict, - ) -> Tuple[Optional[BpmnProcessModel], TaskModel, dict[str, TaskModel], dict[str, JsonDataDict]]: + ) -> Tuple[Optional[BpmnProcessModel], TaskModel]: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by(guid=spiff_task_guid).first() bpmn_process = None - new_task_models: dict[str, TaskModel] = {} - new_json_data_dicts: dict[str, JsonDataDict] = {} if task_model is None: - bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( + bpmn_process = self.task_bpmn_process( spiff_task, - process_instance, - serializer, - bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, ) task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() if task_model is None: - task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ + task_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ spiff_task.task_spec.name ] task_model = TaskModel( guid=spiff_task_guid, bpmn_process_id=bpmn_process.id, - process_instance_id=process_instance.id, + process_instance_id=self.process_instance.id, task_definition_id=task_definition.id, ) - return (bpmn_process, task_model, new_task_models, new_json_data_dicts) + return (bpmn_process, task_model) - @classmethod - def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: - top_level_workflow = spiff_task.workflow._get_outermost_workflow() - my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of - my_sp = None - my_sp_id = None - if my_wf != top_level_workflow: - # All the subprocesses are at the top level, so you can just compare them - for sp_id, sp in top_level_workflow.subprocesses.items(): - if sp == my_wf: - my_sp = sp - my_sp_id = str(sp_id) - break - return (my_sp_id, my_sp) - - @classmethod def task_bpmn_process( - cls, + self, spiff_task: SpiffTask, - process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer, - bpmn_definition_to_task_definitions_mappings: dict, - ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: - subprocess_guid, subprocess = cls.task_subprocess(spiff_task) + ) -> BpmnProcessModel: + subprocess_guid, subprocess = self.__class__._task_subprocess(spiff_task) bpmn_process: Optional[BpmnProcessModel] = None - new_task_models: dict[str, TaskModel] = {} - new_json_data_dicts: dict[str, JsonDataDict] = {} if subprocess is None: - bpmn_process = process_instance.bpmn_process + bpmn_process = self.process_instance.bpmn_process # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None - if process_instance.bpmn_process_id is None: + if self.process_instance.bpmn_process_id is None: spiff_workflow = spiff_task.workflow._get_outermost_workflow() - bpmn_process, new_task_models, new_json_data_dicts = cls.add_bpmn_process( - bpmn_process_dict=serializer.workflow_to_dict(spiff_workflow), - process_instance=process_instance, - bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, + bpmn_process = self.add_bpmn_process( + bpmn_process_dict=self.serializer.workflow_to_dict(spiff_workflow), spiff_workflow=spiff_workflow, - serializer=serializer, ) else: bpmn_process = BpmnProcessModel.query.filter_by(guid=subprocess_guid).first() if bpmn_process is None: spiff_workflow = spiff_task.workflow - bpmn_process, new_task_models, new_json_data_dicts = cls.add_bpmn_process( - bpmn_process_dict=serializer.workflow_to_dict(subprocess), - process_instance=process_instance, - top_level_process=process_instance.bpmn_process, + bpmn_process = self.add_bpmn_process( + bpmn_process_dict=self.serializer.workflow_to_dict(subprocess), + top_level_process=self.process_instance.bpmn_process, bpmn_process_guid=subprocess_guid, - bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, spiff_workflow=spiff_workflow, - serializer=serializer, ) - return (bpmn_process, new_task_models, new_json_data_dicts) + return bpmn_process - @classmethod def add_bpmn_process( - cls, + self, bpmn_process_dict: dict, - process_instance: ProcessInstanceModel, - bpmn_definition_to_task_definitions_mappings: dict, spiff_workflow: BpmnWorkflow, - serializer: BpmnWorkflowSerializer, top_level_process: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, - ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: + ) -> BpmnProcessModel: """This creates and adds a bpmn_process to the Db session. It will also add tasks and relating json_data entries if the bpmn_process is new. @@ -420,23 +358,20 @@ class TaskService: if "subprocess_specs" in bpmn_process_dict: bpmn_process_dict.pop("subprocess_specs") - new_task_models: dict[str, TaskModel] = {} - new_json_data_dicts: dict[str, JsonDataDict] = {} - bpmn_process = None if top_level_process is not None: bpmn_process = BpmnProcessModel.query.filter_by( top_level_process_id=top_level_process.id, guid=bpmn_process_guid ).first() - elif process_instance.bpmn_process_id is not None: - bpmn_process = process_instance.bpmn_process + elif self.process_instance.bpmn_process_id is not None: + bpmn_process = self.process_instance.bpmn_process bpmn_process_is_new = False if bpmn_process is None: bpmn_process_is_new = True bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) - bpmn_process_definition = bpmn_definition_to_task_definitions_mappings[spiff_workflow.spec.name][ + bpmn_process_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_workflow.spec.name][ "bpmn_process_definition" ] bpmn_process.bpmn_process_definition = bpmn_process_definition @@ -470,12 +405,12 @@ class TaskService: bpmn_process.properties_json = bpmn_process_dict - bpmn_process_json_data = cls.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict) + bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict) if bpmn_process_json_data is not None: - new_json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data + self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data if top_level_process is None: - process_instance.bpmn_process = bpmn_process + self.process_instance.bpmn_process = bpmn_process elif bpmn_process.top_level_process_id is None: bpmn_process.top_level_process_id = top_level_process.id @@ -484,37 +419,44 @@ class TaskService: db.session.add(bpmn_process) if bpmn_process_is_new: - for task_id, task_properties in tasks.items(): - # The Root task is added to the spec by Spiff when the bpmn process is instantiated - # within Spiff. We do not actually need it and it's missing from our initial - # bpmn process defintion so let's avoid using it. - if task_properties["task_spec"] == "Root": - continue + self.add_tasks_to_bpmn_process( + tasks=tasks, + spiff_workflow=spiff_workflow, + bpmn_process=bpmn_process, + ) + return bpmn_process - # we are going to avoid saving likely and maybe tasks to the db. - # that means we need to remove them from their parents' lists of children as well. - spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) - if spiff_task._has_state(TaskState.PREDICTED_MASK): - cls.remove_spiff_task_from_parent(spiff_task, new_task_models) - continue + def add_tasks_to_bpmn_process( + self, + tasks: dict, + spiff_workflow: BpmnWorkflow, + bpmn_process: BpmnProcessModel, + ) -> None: + for task_id, task_properties in tasks.items(): + # The Root task is added to the spec by Spiff when the bpmn process is instantiated + # within Spiff. We do not actually need it and it's missing from our initial + # bpmn process defintion so let's avoid using it. + if task_properties["task_spec"] == "Root": + continue - task_model = TaskModel.query.filter_by(guid=task_id).first() - if task_model is None: - task_model = cls._create_task( - bpmn_process, - process_instance, - spiff_task, - bpmn_definition_to_task_definitions_mappings, - ) + # we are going to avoid saving likely and maybe tasks to the db. + # that means we need to remove them from their parents' lists of children as well. + spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) + if spiff_task._has_state(TaskState.PREDICTED_MASK): + self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models) + continue - json_data_dict, python_env_dict = cls.update_task_model(task_model, spiff_task, serializer) + task_model = TaskModel.query.filter_by(guid=task_id).first() + if task_model is None: + task_model = self.__class__._create_task( + bpmn_process, + self.process_instance, + spiff_task, + self.bpmn_definition_to_task_definitions_mappings, + ) - new_task_models[task_model.guid] = task_model - if json_data_dict is not None: - new_json_data_dicts[json_data_dict["hash"]] = json_data_dict - if python_env_dict is not None: - new_json_data_dicts[python_env_dict["hash"]] = python_env_dict - return (bpmn_process, new_task_models, new_json_data_dicts) + self.update_task_model(task_model, spiff_task) + self.task_models[task_model.guid] = task_model @classmethod def remove_spiff_task_from_parent(cls, spiff_task: SpiffTask, task_models: dict[str, TaskModel]) -> None: @@ -557,19 +499,6 @@ class TaskService: setattr(task_model, task_model_data_column, task_data_hash) return json_data_dict - @classmethod - def bpmn_process_for_called_activity_or_top_level_process(cls, task_model: TaskModel) -> BpmnProcessModel: - """Returns either the bpmn process for the call activity calling the process or the top level bpmn process. - - For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB. - processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with - the spec reference cache to find process_modelB. - """ - (bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes( - task_model, stop_on_first_call_activity=True - ) - return bpmn_processes[0] - @classmethod def bpmn_process_and_descendants(cls, bpmn_processes: list[BpmnProcessModel]) -> list[BpmnProcessModel]: bpmn_process_ids = [p.id for p in bpmn_processes] @@ -584,7 +513,7 @@ class TaskService: def task_models_of_parent_bpmn_processes( cls, task_model: TaskModel, stop_on_first_call_activity: Optional[bool] = False ) -> Tuple[list[BpmnProcessModel], list[TaskModel]]: - """Returns the list of task models that are associated with the paren bpmn process. + """Returns the list of task models that are associated with the parent bpmn process. Example: TopLevelProcess has SubprocessTaskA which has CallActivityTaskA which has ScriptTaskA. SubprocessTaskA corresponds to SpiffSubprocess1. @@ -613,6 +542,8 @@ class TaskService: b, t = cls.task_models_of_parent_bpmn_processes( parent_task_model, stop_on_first_call_activity=stop_on_first_call_activity ) + # order matters here. since we are traversing backwards (from child to parent) then + # b and t should be the parents of whatever is in bpmn_processes and task_models. return (b + bpmn_processes, t + task_models) return (bpmn_processes, task_models) @@ -631,6 +562,19 @@ class TaskService: bpmn_process_identifiers.append(bpmn_process.bpmn_process_definition.bpmn_identifier) return bpmn_process_identifiers + @classmethod + def bpmn_process_for_called_activity_or_top_level_process(cls, task_model: TaskModel) -> BpmnProcessModel: + """Returns either the bpmn process for the call activity calling the process or the top level bpmn process. + + For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB. + processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with + the spec reference cache to find process_modelB. + """ + (bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes( + task_model, stop_on_first_call_activity=True + ) + return bpmn_processes[0] + @classmethod def reset_task_model_dict( cls, @@ -667,40 +611,19 @@ class TaskService: task_model.properties_json = new_properties_json @classmethod - def _create_task( - cls, - bpmn_process: BpmnProcessModel, - process_instance: ProcessInstanceModel, - spiff_task: SpiffTask, - bpmn_definition_to_task_definitions_mappings: dict, - ) -> TaskModel: - task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ - spiff_task.task_spec.name - ] - task_model = TaskModel( - guid=str(spiff_task.id), - bpmn_process_id=bpmn_process.id, - process_instance_id=process_instance.id, - task_definition_id=task_definition.id, - ) - return task_model - - @classmethod - def _get_python_env_data_dict_from_spiff_task( - cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer - ) -> dict: - user_defined_state = spiff_task.workflow.script_engine.environment.user_defined_state() - # this helps to convert items like datetime objects to be json serializable - converted_data: dict = serializer.data_converter.convert(user_defined_state) - return converted_data - - @classmethod - def update_json_data_dicts_using_list( - cls, json_data_dict_list: list[Optional[JsonDataDict]], json_data_dicts: dict[str, JsonDataDict] + def insert_or_update_json_data_records( + cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] ) -> None: - for json_data_dict in json_data_dict_list: - if json_data_dict is not None: - json_data_dicts[json_data_dict["hash"]] = json_data_dict + list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()] + if len(list_of_dicts) > 0: + on_duplicate_key_stmt = None + if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": + insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts) + on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(data=insert_stmt.inserted.data) + else: + insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts) + on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["hash"]) + db.session.execute(on_duplicate_key_stmt) @classmethod def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict: @@ -729,3 +652,46 @@ class TaskService: @classmethod def get_name_for_display(cls, entity: Union[TaskDefinitionModel, BpmnProcessDefinitionModel]) -> str: return entity.bpmn_name or entity.bpmn_identifier + + @classmethod + def _task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: + top_level_workflow = spiff_task.workflow._get_outermost_workflow() + my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of + my_sp = None + my_sp_id = None + if my_wf != top_level_workflow: + # All the subprocesses are at the top level, so you can just compare them + for sp_id, sp in top_level_workflow.subprocesses.items(): + if sp == my_wf: + my_sp = sp + my_sp_id = str(sp_id) + break + return (my_sp_id, my_sp) + + @classmethod + def _create_task( + cls, + bpmn_process: BpmnProcessModel, + process_instance: ProcessInstanceModel, + spiff_task: SpiffTask, + bpmn_definition_to_task_definitions_mappings: dict, + ) -> TaskModel: + task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ + spiff_task.task_spec.name + ] + task_model = TaskModel( + guid=str(spiff_task.id), + bpmn_process_id=bpmn_process.id, + process_instance_id=process_instance.id, + task_definition_id=task_definition.id, + ) + return task_model + + @classmethod + def _get_python_env_data_dict_from_spiff_task( + cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer + ) -> dict: + user_defined_state = spiff_task.workflow.script_engine.environment.user_defined_state() + # this helps to convert items like datetime objects to be json serializable + converted_data: dict = serializer.data_converter.convert(user_defined_state) + return converted_data