moved add_bpmn_process to task_service w/ burnettk

This commit is contained in:
jasquat 2023-03-10 12:23:27 -05:00
parent 6158733b59
commit 6d5c03a3d0
3 changed files with 81 additions and 83 deletions

View File

@ -1042,78 +1042,6 @@ class ProcessInstanceProcessor:
bpmn_process_definition_parent bpmn_process_definition_parent
) )
def _add_bpmn_process(
self,
bpmn_process_dict: dict,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
add_tasks_if_new_bpmn_process: bool = True,
) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
).first()
elif self.process_instance_model.bpmn_process_id is not None:
bpmn_process = self.process_instance_model.bpmn_process
bpmn_process_is_new = False
if bpmn_process is None:
bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode(
"utf8"
)
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=bpmn_process_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data
)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
self.process_instance_model.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
if bpmn_process_is_new and add_tasks_if_new_bpmn_process:
# if True:
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties
TaskService.update_task_data_on_task_model(task_model, task_data_dict)
db.session.add(task_model)
return bpmn_process
def _add_bpmn_json_records(self) -> None: def _add_bpmn_json_records(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session. """Adds serialized_bpmn_definition and process_instance_data records to the db session.
@ -1139,12 +1067,13 @@ class ProcessInstanceProcessor:
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
# if self.process_instance_model.bpmn_process_id is None: # if self.process_instance_model.bpmn_process_id is None:
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model)
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
print(f"subprocess_task_id: {subprocess_task_id}") print(f"subprocess_task_id: {subprocess_task_id}")
self._add_bpmn_process( TaskService.add_bpmn_process(
subprocess_properties, subprocess_properties,
self.process_instance_model,
bpmn_process_parent, bpmn_process_parent,
bpmn_process_guid=subprocess_task_id, bpmn_process_guid=subprocess_task_id,
) )
@ -1700,7 +1629,6 @@ class ProcessInstanceProcessor:
secondary_engine_step_delegate=step_delegate, secondary_engine_step_delegate=step_delegate,
serializer=self._serializer, serializer=self._serializer,
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
add_bpmn_process=self._add_bpmn_process,
) )
execution_strategy = execution_strategy_named( execution_strategy = execution_strategy_named(
execution_strategy_name, task_model_delegate execution_strategy_name, task_model_delegate

View File

@ -53,14 +53,14 @@ class TaskService:
@classmethod @classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, add_bpmn_process: Any serializer: BpmnWorkflowSerializer
) -> TaskModel: ) -> TaskModel:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by( task_model: Optional[TaskModel] = TaskModel.query.filter_by(
guid=spiff_task_guid guid=spiff_task_guid
).first() ).first()
if task_model is None: if task_model is None:
bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer, add_bpmn_process) bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer)
task_model = TaskModel.query.filter_by( task_model = TaskModel.query.filter_by(
guid=spiff_task_guid guid=spiff_task_guid
).first() ).first()
@ -72,7 +72,7 @@ class TaskService:
return task_model return task_model
@classmethod @classmethod
def task_subprocess(cls, spiff_task: SpiffTask) -> Optional[Tuple[str, BpmnWorkflow]]: def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]:
top_level_workflow = spiff_task.workflow._get_outermost_workflow() top_level_workflow = spiff_task.workflow._get_outermost_workflow()
my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of
my_sp = None my_sp = None
@ -89,7 +89,7 @@ class TaskService:
@classmethod @classmethod
def task_bpmn_process( def task_bpmn_process(
cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, add_bpmn_process: Any serializer: BpmnWorkflowSerializer
) -> BpmnProcessModel: ) -> BpmnProcessModel:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
if subprocess is None: if subprocess is None:
@ -102,10 +102,82 @@ class TaskService:
).first() ).first()
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
if bpmn_process is None: if bpmn_process is None:
bpmn_process = add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance.bpmn_process, subprocess_guid, add_tasks_if_new_bpmn_process=True) bpmn_process = cls.add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid)
db.session.commit() db.session.commit()
# spiff_task_guid = str(spiff_task.id) # spiff_task_guid = str(spiff_task.id)
# raise Exception( # raise Exception(
# f"Could not find bpmn_process for task {spiff_task_guid}" # f"Could not find bpmn_process for task {spiff_task_guid}"
# ) # )
return bpmn_process return bpmn_process
@classmethod
def add_bpmn_process(
cls,
bpmn_process_dict: dict,
process_instance: ProcessInstanceModel,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
).first()
elif process_instance.bpmn_process_id is not None:
bpmn_process = process_instance.bpmn_process
bpmn_process_is_new = False
if bpmn_process is None:
bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode(
"utf8"
)
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=bpmn_process_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data
)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
process_instance.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
if bpmn_process_is_new:
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties
TaskService.update_task_data_on_task_model(task_model, task_data_dict)
db.session.add(task_model)
return bpmn_process

View File

@ -51,14 +51,12 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
add_bpmn_process: Any = None,
) -> None: ) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance self.process_instance = process_instance
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.serializer = serializer self.serializer = serializer
self.add_bpmn_process = add_bpmn_process
def should_update_task_model(self) -> bool: def should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance. """We need to figure out if we have previously save task info on this process intance.
@ -72,7 +70,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.should_update_task_model(): if self.should_update_task_model():
self.current_task_model = ( self.current_task_model = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer, self.add_bpmn_process spiff_task, self.process_instance, self.serializer
) )
) )
self.current_task_model.start_in_seconds = time.time() self.current_task_model.start_in_seconds = time.time()