From 47f94dccbb1119cb6cda034efab350e87e0fcfab Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Wed, 27 Sep 2023 17:04:47 -0400 Subject: [PATCH] Feature/nonetype debug stuff (#513) * some debug items w/ burnettk * removed some db commits from workflow execution service so the passed in save is the only thing that actually commits the transaction * set max depth to 50000 for now w/ burnettk * pyl w/ burnettk * use temp maxdepth fix in SpiffWorkflow w/ burnettk --------- Co-authored-by: jasquat --- spiffworkflow-backend/poetry.lock | 8 +++--- spiffworkflow-backend/pyproject.toml | 2 +- .../spiffworkflow_backend/config/default.py | 2 ++ .../spiffworkflow_backend/models/json_data.py | 1 - .../routes/process_instances_controller.py | 3 ++- .../services/process_instance_processor.py | 17 +++++++++++- .../services/process_model_service.py | 1 + .../services/task_service.py | 1 - .../services/workflow_execution_service.py | 27 +++++++------------ 9 files changed, 36 insertions(+), 26 deletions(-) diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 1ed24576..144fd332 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -2352,7 +2352,7 @@ files = [ [[package]] name = "SpiffWorkflow" version = "2.0.0rc0" -description = "" +description = "A workflow framework and BPMN/DMN Processor" optional = false python-versions = "*" files = [] @@ -2365,8 +2365,8 @@ lxml = "*" [package.source] type = "git" url = "https://github.com/sartography/SpiffWorkflow" -reference = "main" -resolved_reference = "90159bd23c3c74fcf480414473e851460a01e92c" +reference = "temp/short-term-max-depth-fix" +resolved_reference = "56f0ba545b2c0f16a4ce7afac951fd3694fb9040" [[package]] name = "sqlalchemy" @@ -2795,4 +2795,4 @@ tests-strict = ["codecov (==2.0.15)", "pytest (==4.6.0)", "pytest (==4.6.0)", "p [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.12" -content-hash = "38add9297ce0c445da94865bbdf82bb37c95b57e056ec195b3c68704236fcec5" +content-hash = "52d201da1a56af3fad40ecd730548993d93a45023d4a94471112adf4e9bd54b5" diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index c09bb4ef..6c1b9c7e 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -29,7 +29,7 @@ flask-migrate = "*" flask-restful = "*" flask-simple-crypt = "^0.3.3" werkzeug = "*" -SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} +SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "temp/short-term-max-depth-fix"} # SpiffWorkflow = {develop = true, path = "../../spiffworkflow/" } # SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow/" } sentry-sdk = "^1.10" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 6d734304..f6cc0c53 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -155,6 +155,8 @@ config_from_env( "SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID", default="Message_SystemMessageNotification", ) +# check all tasks listed as child tasks are saved to the database +config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False) ### for documentation only # we load the CustomBpmnScriptEngine at import time, where we do not have access to current_app, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py index c5805a60..7f8d2eb4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py @@ -83,5 +83,4 @@ class JsonDataModel(SpiffworkflowBaseDBModel): def create_and_insert_json_data_from_dict(cls, data: dict) -> str: json_data_hash = sha256(json.dumps(data, sort_keys=True).encode("utf8")).hexdigest() cls.insert_or_update_json_data_dict({"hash": json_data_hash, "data": data}) - db.session.commit() return json_data_hash diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index f60f3f8d..68afaa74 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -246,9 +246,10 @@ def process_instance_list( ) json_data_hash = JsonDataModel.create_and_insert_json_data_from_dict(body["report_metadata"]) - response_json["report_hash"] = json_data_hash db.session.commit() + response_json["report_hash"] = json_data_hash + return make_response(jsonify(response_json), 200) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index b1ab4906..20881248 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1026,7 +1026,6 @@ class ProcessInstanceProcessor: def save(self) -> None: """Saves the current state of this processor to the database.""" self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION - self.process_instance_model.status = self.get_status().value current_app.logger.debug( f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}" @@ -1043,6 +1042,7 @@ class ProcessInstanceProcessor: db.session.add(self.process_instance_model) db.session.commit() + human_tasks = HumanTaskModel.query.filter_by( process_instance_id=self.process_instance_model.id, completed=False ).all() @@ -1477,6 +1477,7 @@ class ProcessInstanceProcessor: self.save, ) execution_service.run_and_save(exit_at, save) + self.check_all_tasks() @classmethod def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: @@ -1791,3 +1792,17 @@ class ProcessInstanceProcessor: self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value ) db.session.commit() + + def check_all_tasks(self) -> None: + if current_app.config["SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY"] is not True: + return + tasks = TaskModel.query.filter_by(process_instance_id=self.process_instance_model.id).all() + missing_child_guids = [] + for task in tasks: + for child_task_guid in task.properties_json["children"]: + child_task = TaskModel.query.filter_by(guid=child_task_guid).first() + if child_task is None: + missing_child_guids.append(f"Missing child guid {child_task_guid} for {task.properties_json}") + + if len(missing_child_guids) > 0: + raise Exception(missing_child_guids) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_service.py index 6b18e690..d6ab2461 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_service.py @@ -18,6 +18,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_model import PROCESS_MODEL_SUPPORTED_KEYS_FOR_DISK_SERIALIZATION from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.file_system_service import FileSystemService diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 9ab628f3..237ba9e4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -463,7 +463,6 @@ class TaskService: # otherwise sqlalchemy returns several warnings. for task in human_tasks_to_clear + tasks_to_clear: db.session.delete(task) - db.session.commit() bpmn_processes_to_delete = ( BpmnProcessModel.query.filter(BpmnProcessModel.guid.in_(deleted_task_guids)) # type: ignore diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 15433f8e..d076b8b8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -62,7 +62,7 @@ class EngineStepDelegate: pass @abstractmethod - def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None: + def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None: pass @abstractmethod @@ -162,8 +162,8 @@ class ExecutionStrategy: def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: self.delegate.on_exception(bpmn_process_instance) - def save(self, bpmn_process_instance: BpmnWorkflow) -> None: - self.delegate.save(bpmn_process_instance) + def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.add_object_to_db_session(bpmn_process_instance) def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: tasks = [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] @@ -240,7 +240,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) - def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None: + def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None: # NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace # excludes COMPLETED. the others were required to get PP1 to go to completion. # process FUTURE tasks because Boundary events are not processed otherwise. @@ -249,6 +249,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): # but it didn't quite work in all cases, so we deleted it. you can find it in commit # 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious. for waiting_spiff_task in bpmn_process_instance.get_tasks( + max_depth=50000, # TODO: remove when SpiffWorkflow depth calculation is corrected state=TaskState.WAITING | TaskState.CANCELLED | TaskState.READY @@ -256,15 +257,14 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.LIKELY | TaskState.FUTURE | TaskState.STARTED - | TaskState.ERROR + | TaskState.ERROR, ): self.task_service.update_task_model_with_spiff_task(waiting_spiff_task) self.task_service.save_objects_to_database() if self.secondary_engine_step_delegate: - self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) - db.session.commit() + self.secondary_engine_step_delegate.add_object_to_db_session(bpmn_process_instance) def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: pass @@ -380,9 +380,8 @@ class WorkflowExecutionService: if self.bpmn_process_instance.is_completed(): self.process_instance_completer(self.bpmn_process_instance) - if self.process_instance_model.persistence_level != "none": - self.process_bpmn_messages() - self.queue_waiting_receive_messages() + self.process_bpmn_messages() + self.queue_waiting_receive_messages() except WorkflowTaskException as wte: ProcessInstanceTmpService.add_event_to_process_instance( self.process_instance_model, @@ -398,9 +397,7 @@ class WorkflowExecutionService: finally: if self.process_instance_model.persistence_level != "none": - self.execution_strategy.save(self.bpmn_process_instance) - db.session.commit() - + self.execution_strategy.add_object_to_db_session(self.bpmn_process_instance) if save: self.process_instance_saver() @@ -432,8 +429,6 @@ class WorkflowExecutionService: bpmn_process.properties_json["bpmn_events"] = [] db.session.add(bpmn_process) - db.session.commit() - def queue_waiting_receive_messages(self) -> None: waiting_events = self.bpmn_process_instance.waiting_events() waiting_message_events = filter(lambda e: e.event_type == "MessageEventDefinition", waiting_events) @@ -474,8 +469,6 @@ class WorkflowExecutionService: bpmn_process.properties_json["correlations"] = bpmn_process_correlations db.session.add(bpmn_process) - db.session.commit() - class ProfiledWorkflowExecutionService(WorkflowExecutionService): """A profiled version of the workflow execution service."""