all tests are passing w/ burnettk

This commit is contained in:
jasquat 2023-03-15 11:56:00 -04:00
parent ceca4d1333
commit 14fc7debc1
5 changed files with 76 additions and 82 deletions

View File

@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from sqlalchemy.orm import relationship
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
@ -24,6 +25,8 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel):
properties_json: dict = db.Column(db.JSON, nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
tasks = relationship("TaskModel", cascade="delete") # type: ignore
# subprocess or top_level_process # subprocess or top_level_process
# process_type: str = db.Column(db.String(30), nullable=False) # process_type: str = db.Column(db.String(30), nullable=False)

View File

@ -71,7 +71,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
bpmn_process_id: int | None = db.Column( bpmn_process_id: int | None = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore
) )
bpmn_process = relationship(BpmnProcessModel) bpmn_process = relationship(BpmnProcessModel, cascade="delete")
tasks = relationship("TaskModel", cascade="delete") # type: ignore
spiff_serializer_version = db.Column(db.String(50), nullable=True) spiff_serializer_version = db.Column(db.String(50), nullable=True)

View File

@ -462,10 +462,10 @@ class ProcessInstanceProcessor:
# this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id # this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id
# in the database. This is to cut down on database queries while adding new tasks to the database. # in the database. This is to cut down on database queries while adding new tasks to the database.
# Structure: # Structure:
# { "bpmn_process_definition_identifier": { "task_identifier": bpmn_process_id } } # { "bpmn_process_definition_identifier": { "task_identifier": task_definition } }
# To use from a spiff_task: # To use from a spiff_task:
# [spiff_task.workflow.spec.name][spiff_task.task_spec.name] # [spiff_task.workflow.spec.name][spiff_task.task_spec.name]
self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = {} self.bpmn_definition_to_task_definitions_mappings = {}
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.bpmn_process_definition_id is None: if process_instance_model.bpmn_process_definition_id is None:
@ -482,7 +482,7 @@ class ProcessInstanceProcessor:
) )
try: try:
(self.bpmn_process_instance, self.full_bpmn_process_dict, self.bpmn_definition_identifiers_to_bpmn_process_id_mappings) = ( (self.bpmn_process_instance, self.full_bpmn_process_dict, self.bpmn_definition_to_task_definitions_mappings) = (
self.__get_bpmn_process_instance( self.__get_bpmn_process_instance(
process_instance_model, process_instance_model,
bpmn_process_spec, bpmn_process_spec,
@ -549,18 +549,18 @@ class ProcessInstanceProcessor:
@classmethod @classmethod
def _update_bpmn_definition_mappings( def _update_bpmn_definition_mappings(
cls, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict, bpmn_process_definition_identifier: str, task_definition: TaskDefinitionModel cls, bpmn_definition_to_task_definitions_mappings: dict, bpmn_process_definition_identifier: str, task_definition: TaskDefinitionModel
) -> None: ) -> None:
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
# if bpmn_process_definition_identifier == 'test_process_to_call' and task_definition.bpmn_identifier == "Root": # if bpmn_process_definition_identifier == 'test_process_to_call' and task_definition.bpmn_identifier == "Root":
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
if bpmn_process_definition_identifier not in bpmn_definition_identifiers_to_bpmn_process_id_mappings: if bpmn_process_definition_identifier not in bpmn_definition_to_task_definitions_mappings:
bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier] = {} bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier] = {}
bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier][task_definition.bpmn_identifier] = task_definition bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier][task_definition.bpmn_identifier] = task_definition
@classmethod @classmethod
def _get_definition_dict_for_bpmn_process_definition( def _get_definition_dict_for_bpmn_process_definition(
cls, bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict cls, bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_definition_to_task_definitions_mappings: dict
) -> dict: ) -> dict:
task_definitions = TaskDefinitionModel.query.filter_by( task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id bpmn_process_definition_id=bpmn_process_definition.id
@ -571,7 +571,7 @@ class ProcessInstanceProcessor:
bpmn_process_definition_dict["task_specs"][ bpmn_process_definition_dict["task_specs"][
task_definition.bpmn_identifier task_definition.bpmn_identifier
] = task_definition.properties_json ] = task_definition.properties_json
cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_process_definition.bpmn_identifier, task_definition) cls._update_bpmn_definition_mappings(bpmn_definition_to_task_definitions_mappings, bpmn_process_definition.bpmn_identifier, task_definition)
return bpmn_process_definition_dict return bpmn_process_definition_dict
@classmethod @classmethod
@ -579,7 +579,7 @@ class ProcessInstanceProcessor:
cls, cls,
bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_process_definition: BpmnProcessDefinitionModel,
spiff_bpmn_process_dict: dict, spiff_bpmn_process_dict: dict,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict, bpmn_definition_to_task_definitions_mappings: dict,
) -> None: ) -> None:
# find all child subprocesses of a process # find all child subprocesses of a process
bpmn_process_subprocess_definitions = ( bpmn_process_subprocess_definitions = (
@ -618,7 +618,7 @@ class ProcessInstanceProcessor:
task_definition.bpmn_process_definition_id task_definition.bpmn_process_definition_id
] ]
) )
cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_subprocess_definition_bpmn_identifier, task_definition) cls._update_bpmn_definition_mappings(bpmn_definition_to_task_definitions_mappings, bpmn_subprocess_definition_bpmn_identifier, task_definition)
spiff_bpmn_process_dict["subprocess_specs"][ spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition_bpmn_identifier bpmn_subprocess_definition_bpmn_identifier
]["task_specs"][ ]["task_specs"][
@ -667,7 +667,7 @@ class ProcessInstanceProcessor:
@classmethod @classmethod
def _get_full_bpmn_process_dict( def _get_full_bpmn_process_dict(
cls, process_instance_model: ProcessInstanceModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict cls, process_instance_model: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict
) -> dict: ) -> dict:
if process_instance_model.bpmn_process_definition_id is None: if process_instance_model.bpmn_process_definition_id is None:
return {} return {}
@ -682,11 +682,11 @@ class ProcessInstanceProcessor:
if bpmn_process_definition is not None: if bpmn_process_definition is not None:
spiff_bpmn_process_dict["spec"] = ( spiff_bpmn_process_dict["spec"] = (
cls._get_definition_dict_for_bpmn_process_definition( cls._get_definition_dict_for_bpmn_process_definition(
bpmn_process_definition, bpmn_definition_identifiers_to_bpmn_process_id_mappings bpmn_process_definition, bpmn_definition_to_task_definitions_mappings
) )
) )
cls._set_definition_dict_for_bpmn_subprocess_definitions( cls._set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition, spiff_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings bpmn_process_definition, spiff_bpmn_process_dict, bpmn_definition_to_task_definitions_mappings
) )
bpmn_process = process_instance_model.bpmn_process bpmn_process = process_instance_model.bpmn_process
@ -755,8 +755,8 @@ class ProcessInstanceProcessor:
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> Tuple[BpmnWorkflow, dict, dict]: ) -> Tuple[BpmnWorkflow, dict, dict]:
full_bpmn_process_dict = {} full_bpmn_process_dict = {}
bpmn_definition_identifiers_to_bpmn_process_id_mappings = {} bpmn_definition_to_task_definitions_mappings = {}
print("GET BPMN PROCESS INSTANCE") # print("GET BPMN PROCESS INSTANCE")
if process_instance_model.bpmn_process_definition_id is not None: if process_instance_model.bpmn_process_definition_id is not None:
# turn off logging to avoid duplicated spiff logs # turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff") spiff_logger = logging.getLogger("spiff")
@ -766,10 +766,10 @@ class ProcessInstanceProcessor:
try: try:
full_bpmn_process_dict = ( full_bpmn_process_dict = (
ProcessInstanceProcessor._get_full_bpmn_process_dict( ProcessInstanceProcessor._get_full_bpmn_process_dict(
process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings process_instance_model, bpmn_definition_to_task_definitions_mappings
) )
) )
print("WE GOT FULL BPMN PROCESS DICT") # print("WE GOT FULL BPMN PROCESS DICT")
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor._serializer.workflow_from_dict( ProcessInstanceProcessor._serializer.workflow_from_dict(
full_bpmn_process_dict full_bpmn_process_dict
@ -782,7 +782,7 @@ class ProcessInstanceProcessor:
ProcessInstanceProcessor.set_script_engine(bpmn_process_instance) ProcessInstanceProcessor.set_script_engine(bpmn_process_instance)
else: else:
print("WE NO HAVE FULL BPMN YET") # print("WE NO HAVE FULL BPMN YET")
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec( ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec(
spec, subprocesses spec, subprocesses
@ -792,7 +792,7 @@ class ProcessInstanceProcessor:
bpmn_process_instance.data[ bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = validate_only ] = validate_only
return (bpmn_process_instance, full_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings) return (bpmn_process_instance, full_bpmn_process_dict, bpmn_definition_to_task_definitions_mappings)
def slam_in_data(self, data: dict) -> None: def slam_in_data(self, data: dict) -> None:
"""Slam_in_data.""" """Slam_in_data."""
@ -1063,16 +1063,14 @@ class ProcessInstanceProcessor:
bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = ( bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = (
BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first() BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first()
) )
print(f"process_bpmn_properties: {process_bpmn_properties}") # print(f"process_bpmn_properties: {process_bpmn_properties}")
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
# if process_bpmn_identifier == "test_process_to_call": # if process_bpmn_identifier == "test_process_to_call":
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
# print("HEY22") # # print("HEY22")
print(f"self.process_instance_model.id: {self.process_instance_model.id}") # print(f"self.process_instance_model.id: {self.process_instance_model.id}")
if bpmn_process_definition is None: if bpmn_process_definition is None:
# import pdb; pdb.set_trace()
print("NO DEFINITION")
task_specs = process_bpmn_properties.pop("task_specs") task_specs = process_bpmn_properties.pop("task_specs")
bpmn_process_definition = BpmnProcessDefinitionModel( bpmn_process_definition = BpmnProcessDefinitionModel(
hash=new_hash_digest, hash=new_hash_digest,
@ -1090,13 +1088,13 @@ class ProcessInstanceProcessor:
) )
db.session.add(task_definition) db.session.add(task_definition)
if store_bpmn_definition_mappings: if store_bpmn_definition_mappings:
self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition) self._update_bpmn_definition_mappings(self.bpmn_definition_to_task_definitions_mappings, process_bpmn_identifier, task_definition)
elif store_bpmn_definition_mappings: elif store_bpmn_definition_mappings:
# this should only ever happen when new process instances use a pre-existing bpmn process definitions # this should only ever happen when new process instances use a pre-existing bpmn process definitions
# otherwise this should get populated on processor initialization # otherwise this should get populated on processor initialization
task_definitions = TaskDefinitionModel.query.filter_by(bpmn_process_definition_id=bpmn_process_definition.id).all() task_definitions = TaskDefinitionModel.query.filter_by(bpmn_process_definition_id=bpmn_process_definition.id).all()
for task_definition in task_definitions: for task_definition in task_definitions:
self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition) self._update_bpmn_definition_mappings(self.bpmn_definition_to_task_definitions_mappings, process_bpmn_identifier, task_definition)
if bpmn_process_definition_parent is not None: if bpmn_process_definition_parent is not None:
bpmn_process_definition_relationship = ( bpmn_process_definition_relationship = (
@ -1115,8 +1113,8 @@ class ProcessInstanceProcessor:
def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None: def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None:
# store only if mappings is currently empty. this also would mean this is a new instance that has never saved before # store only if mappings is currently empty. this also would mean this is a new instance that has never saved before
print("WE STORE BPM PROCESS DEF") # print("WE STORE BPM PROCESS DEF")
store_bpmn_definition_mappings = not self.bpmn_definition_identifiers_to_bpmn_process_id_mappings store_bpmn_definition_mappings = not self.bpmn_definition_to_task_definitions_mappings
bpmn_process_definition_parent = self._store_bpmn_process_definition( bpmn_process_definition_parent = self._store_bpmn_process_definition(
bpmn_spec_dict["spec"], store_bpmn_definition_mappings=store_bpmn_definition_mappings bpmn_spec_dict["spec"], store_bpmn_definition_mappings=store_bpmn_definition_mappings
) )
@ -1134,7 +1132,7 @@ class ProcessInstanceProcessor:
Expects the save method to commit it. Expects the save method to commit it.
""" """
print("WE SAVE THINGS") # print("WE SAVE THINGS")
bpmn_dict = self.serialize() bpmn_dict = self.serialize()
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {} process_instance_data_dict = {}
@ -1149,13 +1147,16 @@ class ProcessInstanceProcessor:
# if self.process_instance_model.bpmn_process_definition_id is None: # if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
# import pdb; pdb.set_trace() # import pdb; pdb.set_trace()
print("WE NOW STORE BPMN PROCESS STUFFS") # print("WE NOW STORE BPMN PROCESS STUFFS")
print(f"bpmn_definition_identifiers_to_bpmn_process_id_mappings: {self.bpmn_definition_identifiers_to_bpmn_process_id_mappings}") # print(f"bpmn_definition_to_task_definitions_mappings: {self.bpmn_definition_to_task_definitions_mappings}")
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent, new_task_models, new_json_data_dicts = ( bpmn_process_parent, new_task_models, new_json_data_dicts = (
TaskService.add_bpmn_process( TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, spiff_workflow=self.bpmn_process_instance bpmn_process_dict=process_instance_data_dict,
process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
spiff_workflow=self.bpmn_process_instance
) )
) )
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
@ -1164,11 +1165,11 @@ class ProcessInstanceProcessor:
subprocess_new_task_models, subprocess_new_task_models,
subprocess_new_json_data_models, subprocess_new_json_data_models,
) = TaskService.add_bpmn_process( ) = TaskService.add_bpmn_process(
subprocess_properties, bpmn_process_dict=subprocess_properties,
self.process_instance_model, process_instance=self.process_instance_model,
bpmn_process_parent, bpmn_process_parent=bpmn_process_parent,
bpmn_process_guid=subprocess_task_id, bpmn_process_guid=subprocess_task_id,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
spiff_workflow=self.bpmn_process_instance spiff_workflow=self.bpmn_process_instance
) )
new_task_models.update(subprocess_new_task_models) new_task_models.update(subprocess_new_task_models)
@ -1179,7 +1180,7 @@ class ProcessInstanceProcessor:
def save(self) -> None: def save(self) -> None:
"""Saves the current state of this processor to the database.""" """Saves the current state of this processor to the database."""
print("WE IN SAVE") # print("WE IN SAVE")
self._add_bpmn_json_records() self._add_bpmn_json_records()
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
@ -1307,7 +1308,7 @@ class ProcessInstanceProcessor:
try: try:
self.bpmn_process_instance.catch(event_definition) self.bpmn_process_instance.catch(event_definition)
except Exception as e: except Exception as e:
print(e) print(e)
# TODO: do_engine_steps without a lock # TODO: do_engine_steps without a lock
self.do_engine_steps(save=True) self.do_engine_steps(save=True)
@ -1689,7 +1690,7 @@ class ProcessInstanceProcessor:
secondary_engine_step_delegate=step_delegate, secondary_engine_step_delegate=step_delegate,
serializer=self._serializer, serializer=self._serializer,
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
if execution_strategy_name is None: if execution_strategy_name is None:

View File

@ -51,7 +51,6 @@ class TaskService:
task_model: TaskModel, task_model: TaskModel,
spiff_task: SpiffTask, spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> Optional[JsonDataDict]: ) -> Optional[JsonDataDict]:
"""Updates properties_json and data on given task_model. """Updates properties_json and data on given task_model.
@ -73,7 +72,7 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[ ) -> Tuple[
Optional[BpmnProcessModel], Optional[BpmnProcessModel],
TaskModel, TaskModel,
@ -89,11 +88,11 @@ class TaskService:
new_json_data_dicts: dict[str, JsonDataDict] = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
if task_model is None: if task_model is None:
bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process(
spiff_task, process_instance, serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings spiff_task, process_instance, serializer, bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings
) )
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
if task_model is None: if task_model is None:
task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id guid=spiff_task_guid, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id
) )
@ -122,7 +121,7 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None bpmn_process: Optional[BpmnProcessModel] = None
@ -136,11 +135,11 @@ class TaskService:
spiff_workflow = spiff_task.workflow._get_outermost_workflow() spiff_workflow = spiff_task.workflow._get_outermost_workflow()
bpmn_process, new_task_models, new_json_data_dicts = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict( bpmn_process_dict=serializer.workflow_to_dict(
spiff_workflow spiff_workflow
), ),
process_instance, process_instance=process_instance,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
spiff_workflow=spiff_workflow, spiff_workflow=spiff_workflow,
) )
) )
@ -152,11 +151,11 @@ class TaskService:
spiff_workflow = spiff_task.workflow spiff_workflow = spiff_task.workflow
bpmn_process, new_task_models, new_json_data_dicts = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess), bpmn_process_dict=serializer.workflow_to_dict(subprocess),
process_instance, process_instance=process_instance,
process_instance.bpmn_process, bpmn_process_parent=process_instance.bpmn_process,
subprocess_guid, bpmn_process_guid=subprocess_guid,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
spiff_workflow=spiff_workflow, spiff_workflow=spiff_workflow,
) )
@ -168,10 +167,10 @@ class TaskService:
cls, cls,
bpmn_process_dict: dict, bpmn_process_dict: dict,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
spiff_workflow: BpmnWorkflow,
bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
spiff_workflow: Optional[BpmnWorkflow] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
"""This creates and adds a bpmn_process to the Db session. """This creates and adds a bpmn_process to the Db session.
@ -185,7 +184,6 @@ class TaskService:
new_json_data_dicts: dict[str, JsonDataDict] = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
bpmn_process = None bpmn_process = None
print("ADD BPMN PROCESS")
if bpmn_process_parent is not None: if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by( bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
@ -208,7 +206,6 @@ class TaskService:
bpmn_process_data_json.encode("utf8") bpmn_process_data_json.encode("utf8")
).hexdigest() ).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash: if bpmn_process.json_data_hash != bpmn_process_data_hash:
# print(f"bpmn_process_data_dict: {bpmn_process_data_dict}")
new_json_data_dicts[bpmn_process_data_hash] = { new_json_data_dicts[bpmn_process_data_hash] = {
"hash": bpmn_process_data_hash, "hash": bpmn_process_data_hash,
"data": bpmn_process_data_dict, "data": bpmn_process_data_dict,
@ -232,32 +229,15 @@ class TaskService:
task_properties['parent'] = None task_properties['parent'] = None
process_dict = bpmn_process.properties_json process_dict = bpmn_process.properties_json
process_dict['root'] = task_id process_dict['root'] = task_id
# print(f"process_dict: {process_dict}")
bpmn_process.properties_json = process_dict bpmn_process.properties_json = process_dict
# print(f"bpmn_process.properties_json: {bpmn_process.properties_json}")
db.session.add(bpmn_process) db.session.add(bpmn_process)
task_data_dict = task_properties.pop("data") task_data_dict = task_properties.pop("data")
state_int = task_properties["state"] state_int = task_properties["state"]
task_model = TaskModel.query.filter_by(guid=task_id).first() task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None: if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
spiff_task = spiff_workflow.get_task(UUID(task_id)) spiff_task = spiff_workflow.get_task(UUID(task_id))
try: task_model = cls._create_task(bpmn_process, process_instance, spiff_task, bpmn_definition_to_task_definitions_mappings)
task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
except Exception as ex:
import pdb; pdb.set_trace()
print("HEY")
raise ex
task_model = TaskModel(
guid=task_id, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id
)
task_model.state = TaskStateNames[state_int] task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties task_model.properties_json = task_properties
@ -281,3 +261,12 @@ class TaskService:
json_data_dict = {"hash": task_data_hash, "data": task_data_dict} json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
task_model.json_data_hash = task_data_hash task_model.json_data_hash = task_data_hash
return json_data_dict return json_data_dict
@classmethod
def _create_task(cls, bpmn_process: BpmnProcessModel, process_instance: ProcessInstanceModel, spiff_task: SpiffTask, bpmn_definition_to_task_definitions_mappings: dict) -> TaskModel:
task_definition = bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
task_model = TaskModel(
guid=str(spiff_task.id), bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id
)
return task_model

View File

@ -57,12 +57,12 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self, self,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> None: ) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance self.process_instance = process_instance
self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = bpmn_definition_identifiers_to_bpmn_process_id_mappings self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.task_models: dict[str, TaskModel] = {} self.task_models: dict[str, TaskModel] = {}
@ -80,7 +80,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.should_update_task_model(): if self.should_update_task_model():
_bpmn_process, task_model, new_task_models, new_json_data_dicts = ( _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings spiff_task, self.process_instance, self.serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings
) )
) )
self.current_task_model = task_model self.current_task_model = task_model
@ -94,7 +94,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.current_task_model and self.should_update_task_model(): if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time() self.current_task_model.end_in_seconds = time.time()
json_data_dict = TaskService.update_task_model( json_data_dict = TaskService.update_task_model(
self.current_task_model, spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings self.current_task_model, spiff_task, self.serializer
) )
if json_data_dict is not None: if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
@ -123,13 +123,13 @@ class TaskModelSavingDelegate(EngineStepDelegate):
): ):
_bpmn_process, task_model, new_task_models, new_json_data_dicts = ( _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings waiting_spiff_task, self.process_instance, self.serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings
) )
) )
self.task_models.update(new_task_models) self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts) self.json_data_dicts.update(new_json_data_dicts)
json_data_dict = TaskService.update_task_model( json_data_dict = TaskService.update_task_model(
task_model, waiting_spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings task_model, waiting_spiff_task, self.serializer
) )
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model
if json_data_dict is not None: if json_data_dict is not None: