refactored TaskService so more of the classmethods are now instance methods and work more implicitly

This commit is contained in:
jasquat 2023-04-25 12:58:00 -04:00
parent 849232be33
commit d2911889c1
2 changed files with 162 additions and 204 deletions

View File

@ -100,7 +100,6 @@ from spiffworkflow_backend.services.process_instance_tmp_service import ProcessI
from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.task_service import JsonDataDict
from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named
@ -1164,38 +1163,34 @@ class ProcessInstanceProcessor:
task.complete() task.complete()
spiff_tasks_updated[task.id] = task spiff_tasks_updated[task.id] = task
task_service = TaskService(
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
for updated_spiff_task in spiff_tasks_updated.values(): for updated_spiff_task in spiff_tasks_updated.values():
( (
bpmn_process, bpmn_process,
task_model, task_model,
new_task_models, ) = task_service.find_or_create_task_model_from_spiff_task(
new_json_data_dicts,
) = TaskService.find_or_create_task_model_from_spiff_task(
updated_spiff_task, updated_spiff_task,
self.process_instance_model,
self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
bpmn_process_to_use = bpmn_process or task_model.bpmn_process bpmn_process_to_use = bpmn_process or task_model.bpmn_process
bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process( bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process(
bpmn_process_to_use, updated_spiff_task.workflow.data bpmn_process_to_use, updated_spiff_task.workflow.data
) )
db.session.add(bpmn_process_to_use) db.session.add(bpmn_process_to_use)
json_data_dict_list = TaskService.update_task_model(task_model, updated_spiff_task, self._serializer) task_service.update_task_model(task_model, updated_spiff_task)
for json_data_dict in json_data_dict_list:
if json_data_dict is not None:
new_json_data_dicts[json_data_dict["hash"]] = json_data_dict
if bpmn_process_json_data is not None: if bpmn_process_json_data is not None:
new_json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data task_service.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data
# spiff_task should be the main task we are completing and only it should get the timestamps # spiff_task should be the main task we are completing and only it should get the timestamps
if task_model.guid == str(spiff_task.id): if task_model.guid == str(spiff_task.id):
task_model.start_in_seconds = start_in_seconds task_model.start_in_seconds = start_in_seconds
task_model.end_in_seconds = end_in_seconds task_model.end_in_seconds = end_in_seconds
new_task_models[task_model.guid] = task_model task_service.task_models[task_model.guid] = task_model
db.session.bulk_save_objects(new_task_models.values()) task_service.save_objects_to_database()
TaskService.insert_or_update_json_data_records(new_json_data_dicts)
ProcessInstanceTmpService.add_event_to_process_instance( ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, event_type, task_guid=task_id self.process_instance_model, event_type, task_guid=task_id
@ -1736,10 +1731,13 @@ class ProcessInstanceProcessor:
human_task.task_status = spiff_task.get_state_name() human_task.task_status = spiff_task.get_state_name()
db.session.add(human_task) db.session.add(human_task)
json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self._serializer) task_service = TaskService(
json_data_dict_mapping: dict[str, JsonDataDict] = {} process_instance=self.process_instance_model,
TaskService.update_json_data_dicts_using_list(json_data_dict_list, json_data_dict_mapping) serializer=self._serializer,
TaskService.insert_or_update_json_data_records(json_data_dict_mapping) bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
task_service.update_task_model(task_model, spiff_task)
TaskService.insert_or_update_json_data_records(task_service.json_data_dicts)
ProcessInstanceTmpService.add_event_to_process_instance( ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, self.process_instance_model,
@ -1747,12 +1745,6 @@ class ProcessInstanceProcessor:
task_guid=task_model.guid, task_guid=task_model.guid,
user_id=user.id, user_id=user.id,
) )
task_service = TaskService(
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
task_service.process_parents_and_children_and_save_to_database(spiff_task) task_service.process_parents_and_children_and_save_to_database(spiff_task)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)

View File

@ -164,7 +164,7 @@ class TaskService:
This will also process that subprocess task's children and will recurse upwards This will also process that subprocess task's children and will recurse upwards
to process its parent subprocesses as well. to process its parent subprocesses as well.
""" """
(parent_subprocess_guid, _parent_subprocess) = self.__class__.task_subprocess(spiff_task) (parent_subprocess_guid, _parent_subprocess) = self.__class__._task_subprocess(spiff_task)
if parent_subprocess_guid is not None: if parent_subprocess_guid is not None:
spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id(
UUID(parent_subprocess_guid) UUID(parent_subprocess_guid)
@ -193,16 +193,9 @@ class TaskService:
( (
new_bpmn_process, new_bpmn_process,
task_model, task_model,
new_task_models, ) = self.find_or_create_task_model_from_spiff_task(
new_json_data_dicts,
) = self.__class__.find_or_create_task_model_from_spiff_task(
spiff_task, spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
# we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value # we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value
bpmn_process = ( bpmn_process = (
@ -211,14 +204,13 @@ class TaskService:
or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first() or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first()
) )
self.update_task_model(task_model, spiff_task)
bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process( bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(
bpmn_process, spiff_task.workflow.data bpmn_process, spiff_task.workflow.data
) )
json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer)
self.task_models[task_model.guid] = task_model
if bpmn_process_json_data is not None: if bpmn_process_json_data is not None:
json_data_dict_list.append(bpmn_process_json_data) self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data
self.update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts) self.task_models[task_model.guid] = task_model
if start_and_end_times: if start_and_end_times:
task_model.start_in_seconds = start_and_end_times["start_in_seconds"] task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
@ -264,147 +256,93 @@ class TaskService:
).first() ).first()
self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process) self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process)
@classmethod
def insert_or_update_json_data_records(
cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]
) -> None:
list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()]
if len(list_of_dicts) > 0:
on_duplicate_key_stmt = None
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql":
insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(data=insert_stmt.inserted.data)
else:
insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["hash"])
db.session.execute(on_duplicate_key_stmt)
@classmethod
def update_task_model( def update_task_model(
cls, self,
task_model: TaskModel, task_model: TaskModel,
spiff_task: SpiffTask, spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer, ) -> None:
) -> list[Optional[JsonDataDict]]:
"""Updates properties_json and data on given task_model. """Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds. This will NOT update start_in_seconds or end_in_seconds.
It also returns the relating json_data object so they can be imported later. It also returns the relating json_data object so they can be imported later.
""" """
new_properties_json = serializer.task_to_dict(spiff_task) new_properties_json = self.serializer.task_to_dict(spiff_task)
if new_properties_json["task_spec"] == "Start": if new_properties_json["task_spec"] == "Start":
new_properties_json["parent"] = None new_properties_json["parent"] = None
spiff_task_data = new_properties_json.pop("data") spiff_task_data = new_properties_json.pop("data")
python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task, serializer) python_env_data_dict = self.__class__._get_python_env_data_dict_from_spiff_task(spiff_task, self.serializer)
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]] task_model.state = TaskStateNames[new_properties_json["state"]]
json_data_dict = cls.update_task_data_on_task_model_and_return_dict_if_updated( json_data_dict = self.__class__.update_task_data_on_task_model_and_return_dict_if_updated(
task_model, spiff_task_data, "json_data_hash" task_model, spiff_task_data, "json_data_hash"
) )
python_env_dict = cls.update_task_data_on_task_model_and_return_dict_if_updated( python_env_dict = self.__class__.update_task_data_on_task_model_and_return_dict_if_updated(
task_model, python_env_data_dict, "python_env_data_hash" task_model, python_env_data_dict, "python_env_data_hash"
) )
return [json_data_dict, python_env_dict] if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
if python_env_dict is not None:
self.json_data_dicts[python_env_dict["hash"]] = python_env_dict
@classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
cls, self,
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, ) -> Tuple[Optional[BpmnProcessModel], TaskModel]:
serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[Optional[BpmnProcessModel], TaskModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model: Optional[TaskModel] = TaskModel.query.filter_by(guid=spiff_task_guid).first()
bpmn_process = None bpmn_process = None
new_task_models: dict[str, TaskModel] = {}
new_json_data_dicts: dict[str, JsonDataDict] = {}
if task_model is None: if task_model is None:
bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( bpmn_process = self.task_bpmn_process(
spiff_task, spiff_task,
process_instance,
serializer,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
) )
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
if task_model is None: if task_model is None:
task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ task_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][
spiff_task.task_spec.name spiff_task.task_spec.name
] ]
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, guid=spiff_task_guid,
bpmn_process_id=bpmn_process.id, bpmn_process_id=bpmn_process.id,
process_instance_id=process_instance.id, process_instance_id=self.process_instance.id,
task_definition_id=task_definition.id, task_definition_id=task_definition.id,
) )
return (bpmn_process, task_model, new_task_models, new_json_data_dicts) return (bpmn_process, task_model)
@classmethod
def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]:
top_level_workflow = spiff_task.workflow._get_outermost_workflow()
my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of
my_sp = None
my_sp_id = None
if my_wf != top_level_workflow:
# All the subprocesses are at the top level, so you can just compare them
for sp_id, sp in top_level_workflow.subprocesses.items():
if sp == my_wf:
my_sp = sp
my_sp_id = str(sp_id)
break
return (my_sp_id, my_sp)
@classmethod
def task_bpmn_process( def task_bpmn_process(
cls, self,
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, ) -> BpmnProcessModel:
serializer: BpmnWorkflowSerializer, subprocess_guid, subprocess = self.__class__._task_subprocess(spiff_task)
bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None bpmn_process: Optional[BpmnProcessModel] = None
new_task_models: dict[str, TaskModel] = {}
new_json_data_dicts: dict[str, JsonDataDict] = {}
if subprocess is None: if subprocess is None:
bpmn_process = process_instance.bpmn_process bpmn_process = self.process_instance.bpmn_process
# This is the top level workflow, which has no guid # This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None: if self.process_instance.bpmn_process_id is None:
spiff_workflow = spiff_task.workflow._get_outermost_workflow() spiff_workflow = spiff_task.workflow._get_outermost_workflow()
bpmn_process, new_task_models, new_json_data_dicts = cls.add_bpmn_process( bpmn_process = self.add_bpmn_process(
bpmn_process_dict=serializer.workflow_to_dict(spiff_workflow), bpmn_process_dict=self.serializer.workflow_to_dict(spiff_workflow),
process_instance=process_instance,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
spiff_workflow=spiff_workflow, spiff_workflow=spiff_workflow,
serializer=serializer,
) )
else: else:
bpmn_process = BpmnProcessModel.query.filter_by(guid=subprocess_guid).first() bpmn_process = BpmnProcessModel.query.filter_by(guid=subprocess_guid).first()
if bpmn_process is None: if bpmn_process is None:
spiff_workflow = spiff_task.workflow spiff_workflow = spiff_task.workflow
bpmn_process, new_task_models, new_json_data_dicts = cls.add_bpmn_process( bpmn_process = self.add_bpmn_process(
bpmn_process_dict=serializer.workflow_to_dict(subprocess), bpmn_process_dict=self.serializer.workflow_to_dict(subprocess),
process_instance=process_instance, top_level_process=self.process_instance.bpmn_process,
top_level_process=process_instance.bpmn_process,
bpmn_process_guid=subprocess_guid, bpmn_process_guid=subprocess_guid,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
spiff_workflow=spiff_workflow, spiff_workflow=spiff_workflow,
serializer=serializer,
) )
return (bpmn_process, new_task_models, new_json_data_dicts) return bpmn_process
@classmethod
def add_bpmn_process( def add_bpmn_process(
cls, self,
bpmn_process_dict: dict, bpmn_process_dict: dict,
process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
spiff_workflow: BpmnWorkflow, spiff_workflow: BpmnWorkflow,
serializer: BpmnWorkflowSerializer,
top_level_process: Optional[BpmnProcessModel] = None, top_level_process: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: ) -> BpmnProcessModel:
"""This creates and adds a bpmn_process to the Db session. """This creates and adds a bpmn_process to the Db session.
It will also add tasks and relating json_data entries if the bpmn_process is new. It will also add tasks and relating json_data entries if the bpmn_process is new.
@ -420,23 +358,20 @@ class TaskService:
if "subprocess_specs" in bpmn_process_dict: if "subprocess_specs" in bpmn_process_dict:
bpmn_process_dict.pop("subprocess_specs") bpmn_process_dict.pop("subprocess_specs")
new_task_models: dict[str, TaskModel] = {}
new_json_data_dicts: dict[str, JsonDataDict] = {}
bpmn_process = None bpmn_process = None
if top_level_process is not None: if top_level_process is not None:
bpmn_process = BpmnProcessModel.query.filter_by( bpmn_process = BpmnProcessModel.query.filter_by(
top_level_process_id=top_level_process.id, guid=bpmn_process_guid top_level_process_id=top_level_process.id, guid=bpmn_process_guid
).first() ).first()
elif process_instance.bpmn_process_id is not None: elif self.process_instance.bpmn_process_id is not None:
bpmn_process = process_instance.bpmn_process bpmn_process = self.process_instance.bpmn_process
bpmn_process_is_new = False bpmn_process_is_new = False
if bpmn_process is None: if bpmn_process is None:
bpmn_process_is_new = True bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process_definition = bpmn_definition_to_task_definitions_mappings[spiff_workflow.spec.name][ bpmn_process_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_workflow.spec.name][
"bpmn_process_definition" "bpmn_process_definition"
] ]
bpmn_process.bpmn_process_definition = bpmn_process_definition bpmn_process.bpmn_process_definition = bpmn_process_definition
@ -470,12 +405,12 @@ class TaskService:
bpmn_process.properties_json = bpmn_process_dict bpmn_process.properties_json = bpmn_process_dict
bpmn_process_json_data = cls.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict) bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict)
if bpmn_process_json_data is not None: if bpmn_process_json_data is not None:
new_json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data
if top_level_process is None: if top_level_process is None:
process_instance.bpmn_process = bpmn_process self.process_instance.bpmn_process = bpmn_process
elif bpmn_process.top_level_process_id is None: elif bpmn_process.top_level_process_id is None:
bpmn_process.top_level_process_id = top_level_process.id bpmn_process.top_level_process_id = top_level_process.id
@ -484,37 +419,44 @@ class TaskService:
db.session.add(bpmn_process) db.session.add(bpmn_process)
if bpmn_process_is_new: if bpmn_process_is_new:
for task_id, task_properties in tasks.items(): self.add_tasks_to_bpmn_process(
# The Root task is added to the spec by Spiff when the bpmn process is instantiated tasks=tasks,
# within Spiff. We do not actually need it and it's missing from our initial spiff_workflow=spiff_workflow,
# bpmn process defintion so let's avoid using it. bpmn_process=bpmn_process,
if task_properties["task_spec"] == "Root": )
continue return bpmn_process
# we are going to avoid saving likely and maybe tasks to the db. def add_tasks_to_bpmn_process(
# that means we need to remove them from their parents' lists of children as well. self,
spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) tasks: dict,
if spiff_task._has_state(TaskState.PREDICTED_MASK): spiff_workflow: BpmnWorkflow,
cls.remove_spiff_task_from_parent(spiff_task, new_task_models) bpmn_process: BpmnProcessModel,
continue ) -> None:
for task_id, task_properties in tasks.items():
# The Root task is added to the spec by Spiff when the bpmn process is instantiated
# within Spiff. We do not actually need it and it's missing from our initial
# bpmn process defintion so let's avoid using it.
if task_properties["task_spec"] == "Root":
continue
task_model = TaskModel.query.filter_by(guid=task_id).first() # we are going to avoid saving likely and maybe tasks to the db.
if task_model is None: # that means we need to remove them from their parents' lists of children as well.
task_model = cls._create_task( spiff_task = spiff_workflow.get_task_from_id(UUID(task_id))
bpmn_process, if spiff_task._has_state(TaskState.PREDICTED_MASK):
process_instance, self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models)
spiff_task, continue
bpmn_definition_to_task_definitions_mappings,
)
json_data_dict, python_env_dict = cls.update_task_model(task_model, spiff_task, serializer) task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
task_model = self.__class__._create_task(
bpmn_process,
self.process_instance,
spiff_task,
self.bpmn_definition_to_task_definitions_mappings,
)
new_task_models[task_model.guid] = task_model self.update_task_model(task_model, spiff_task)
if json_data_dict is not None: self.task_models[task_model.guid] = task_model
new_json_data_dicts[json_data_dict["hash"]] = json_data_dict
if python_env_dict is not None:
new_json_data_dicts[python_env_dict["hash"]] = python_env_dict
return (bpmn_process, new_task_models, new_json_data_dicts)
@classmethod @classmethod
def remove_spiff_task_from_parent(cls, spiff_task: SpiffTask, task_models: dict[str, TaskModel]) -> None: def remove_spiff_task_from_parent(cls, spiff_task: SpiffTask, task_models: dict[str, TaskModel]) -> None:
@ -557,19 +499,6 @@ class TaskService:
setattr(task_model, task_model_data_column, task_data_hash) setattr(task_model, task_model_data_column, task_data_hash)
return json_data_dict return json_data_dict
@classmethod
def bpmn_process_for_called_activity_or_top_level_process(cls, task_model: TaskModel) -> BpmnProcessModel:
"""Returns either the bpmn process for the call activity calling the process or the top level bpmn process.
For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB.
processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with
the spec reference cache to find process_modelB.
"""
(bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes(
task_model, stop_on_first_call_activity=True
)
return bpmn_processes[0]
@classmethod @classmethod
def bpmn_process_and_descendants(cls, bpmn_processes: list[BpmnProcessModel]) -> list[BpmnProcessModel]: def bpmn_process_and_descendants(cls, bpmn_processes: list[BpmnProcessModel]) -> list[BpmnProcessModel]:
bpmn_process_ids = [p.id for p in bpmn_processes] bpmn_process_ids = [p.id for p in bpmn_processes]
@ -584,7 +513,7 @@ class TaskService:
def task_models_of_parent_bpmn_processes( def task_models_of_parent_bpmn_processes(
cls, task_model: TaskModel, stop_on_first_call_activity: Optional[bool] = False cls, task_model: TaskModel, stop_on_first_call_activity: Optional[bool] = False
) -> Tuple[list[BpmnProcessModel], list[TaskModel]]: ) -> Tuple[list[BpmnProcessModel], list[TaskModel]]:
"""Returns the list of task models that are associated with the paren bpmn process. """Returns the list of task models that are associated with the parent bpmn process.
Example: TopLevelProcess has SubprocessTaskA which has CallActivityTaskA which has ScriptTaskA. Example: TopLevelProcess has SubprocessTaskA which has CallActivityTaskA which has ScriptTaskA.
SubprocessTaskA corresponds to SpiffSubprocess1. SubprocessTaskA corresponds to SpiffSubprocess1.
@ -613,6 +542,8 @@ class TaskService:
b, t = cls.task_models_of_parent_bpmn_processes( b, t = cls.task_models_of_parent_bpmn_processes(
parent_task_model, stop_on_first_call_activity=stop_on_first_call_activity parent_task_model, stop_on_first_call_activity=stop_on_first_call_activity
) )
# order matters here. since we are traversing backwards (from child to parent) then
# b and t should be the parents of whatever is in bpmn_processes and task_models.
return (b + bpmn_processes, t + task_models) return (b + bpmn_processes, t + task_models)
return (bpmn_processes, task_models) return (bpmn_processes, task_models)
@ -631,6 +562,19 @@ class TaskService:
bpmn_process_identifiers.append(bpmn_process.bpmn_process_definition.bpmn_identifier) bpmn_process_identifiers.append(bpmn_process.bpmn_process_definition.bpmn_identifier)
return bpmn_process_identifiers return bpmn_process_identifiers
@classmethod
def bpmn_process_for_called_activity_or_top_level_process(cls, task_model: TaskModel) -> BpmnProcessModel:
"""Returns either the bpmn process for the call activity calling the process or the top level bpmn process.
For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB.
processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with
the spec reference cache to find process_modelB.
"""
(bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes(
task_model, stop_on_first_call_activity=True
)
return bpmn_processes[0]
@classmethod @classmethod
def reset_task_model_dict( def reset_task_model_dict(
cls, cls,
@ -667,40 +611,19 @@ class TaskService:
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
@classmethod @classmethod
def _create_task( def insert_or_update_json_data_records(
cls, cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]
bpmn_process: BpmnProcessModel,
process_instance: ProcessInstanceModel,
spiff_task: SpiffTask,
bpmn_definition_to_task_definitions_mappings: dict,
) -> TaskModel:
task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][
spiff_task.task_spec.name
]
task_model = TaskModel(
guid=str(spiff_task.id),
bpmn_process_id=bpmn_process.id,
process_instance_id=process_instance.id,
task_definition_id=task_definition.id,
)
return task_model
@classmethod
def _get_python_env_data_dict_from_spiff_task(
cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer
) -> dict:
user_defined_state = spiff_task.workflow.script_engine.environment.user_defined_state()
# this helps to convert items like datetime objects to be json serializable
converted_data: dict = serializer.data_converter.convert(user_defined_state)
return converted_data
@classmethod
def update_json_data_dicts_using_list(
cls, json_data_dict_list: list[Optional[JsonDataDict]], json_data_dicts: dict[str, JsonDataDict]
) -> None: ) -> None:
for json_data_dict in json_data_dict_list: list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()]
if json_data_dict is not None: if len(list_of_dicts) > 0:
json_data_dicts[json_data_dict["hash"]] = json_data_dict on_duplicate_key_stmt = None
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql":
insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(data=insert_stmt.inserted.data)
else:
insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["hash"])
db.session.execute(on_duplicate_key_stmt)
@classmethod @classmethod
def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict: def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict:
@ -729,3 +652,46 @@ class TaskService:
@classmethod @classmethod
def get_name_for_display(cls, entity: Union[TaskDefinitionModel, BpmnProcessDefinitionModel]) -> str: def get_name_for_display(cls, entity: Union[TaskDefinitionModel, BpmnProcessDefinitionModel]) -> str:
return entity.bpmn_name or entity.bpmn_identifier return entity.bpmn_name or entity.bpmn_identifier
@classmethod
def _task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]:
top_level_workflow = spiff_task.workflow._get_outermost_workflow()
my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of
my_sp = None
my_sp_id = None
if my_wf != top_level_workflow:
# All the subprocesses are at the top level, so you can just compare them
for sp_id, sp in top_level_workflow.subprocesses.items():
if sp == my_wf:
my_sp = sp
my_sp_id = str(sp_id)
break
return (my_sp_id, my_sp)
@classmethod
def _create_task(
cls,
bpmn_process: BpmnProcessModel,
process_instance: ProcessInstanceModel,
spiff_task: SpiffTask,
bpmn_definition_to_task_definitions_mappings: dict,
) -> TaskModel:
task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][
spiff_task.task_spec.name
]
task_model = TaskModel(
guid=str(spiff_task.id),
bpmn_process_id=bpmn_process.id,
process_instance_id=process_instance.id,
task_definition_id=task_definition.id,
)
return task_model
@classmethod
def _get_python_env_data_dict_from_spiff_task(
cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer
) -> dict:
user_defined_state = spiff_task.workflow.script_engine.environment.user_defined_state()
# this helps to convert items like datetime objects to be json serializable
converted_data: dict = serializer.data_converter.convert(user_defined_state)
return converted_data