diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 6a36e38d2..bd946b83d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1617,7 +1617,8 @@ class ProcessInstanceProcessor: f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}" ) - task_model.start_in_seconds = time.time() + run_started_at = time.time() + task_model.start_in_seconds = run_started_at task_exception = None task_event = ProcessInstanceEventType.task_completed.value try: @@ -1637,6 +1638,7 @@ class ProcessInstanceProcessor: process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + run_started_at=run_started_at, ) task_service.update_task_model(task_model, spiff_task) JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts) @@ -1657,7 +1659,13 @@ class ProcessInstanceProcessor: spiff_task_to_process = spiff_task if spiff_task_to_process.triggered is True: spiff_task_to_process = spiff_task.parent - task_service.process_parents_and_children_and_save_to_database(spiff_task_to_process) + + tasks_to_update = self.bpmn_process_instance.get_tasks(updated_ts=run_started_at) + for spiff_task_to_update in tasks_to_update: + if spiff_task_to_update.id != spiff_task.id: + task_service.update_task_model_with_spiff_task(spiff_task_to_update) + + task_service.save_objects_to_database() # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 1effacebe..1b3a514aa 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -107,6 +107,7 @@ class TaskService: process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, bpmn_definition_to_task_definitions_mappings: dict, + run_started_at: float | None = None, ) -> None: self.process_instance = process_instance self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings @@ -117,6 +118,8 @@ class TaskService: self.json_data_dicts: dict[str, JsonDataDict] = {} self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} + self.run_started_at: float | None = run_started_at + def save_objects_to_database(self, save_process_instance_events: bool = True) -> None: db.session.bulk_save_objects(self.bpmn_processes.values()) db.session.bulk_save_objects(self.task_models.values()) @@ -208,8 +211,11 @@ class TaskService: task_model.start_in_seconds = start_and_end_times["start_in_seconds"] task_model.end_in_seconds = start_and_end_times["end_in_seconds"] - # let failed tasks raise and we will log the event then - if task_model.state in ["COMPLETED", "CANCELLED"]: + # let failed tasks raise and we will log the event then. + # avoid creating events for the same state transition multiple times to avoid multiple cancelled events + if task_model.state in ["COMPLETED", "CANCELLED"] and ( + self.run_started_at is None or spiff_task.last_state_change >= self.run_started_at + ): event_type = ProcessInstanceEventType.task_completed.value if task_model.state == "CANCELLED": event_type = ProcessInstanceEventType.task_cancelled.value @@ -438,7 +444,6 @@ class TaskService: if spiff_task.has_state(TaskState.PREDICTED_MASK): self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models) continue - task_model = TaskModel.query.filter_by(guid=task_id).first() if task_model is None: task_model = self.__class__._create_task( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 2ddb3ff34..1c75d5717 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -201,12 +201,11 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.spiff_tasks_to_process: set[UUID] = set() self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {} - self.run_started_at = time.time() - self.task_service = TaskService( process_instance=self.process_instance, serializer=self.serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + run_started_at=time.time(), ) def will_complete_task(self, spiff_task: SpiffTask) -> None: @@ -252,6 +251,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): # 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious. for waiting_spiff_task in bpmn_process_instance.get_tasks( state=TaskState.WAITING + | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY @@ -261,21 +261,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): ): self.task_service.update_task_model_with_spiff_task(waiting_spiff_task) - # FIXME: this may have broken error boundary events getting cancelled. - # Getting all cancelled tasks to see if that fixes it - # - # only process cancelled tasks that were cancelled during this run - # NOTE: this could mean we do not add task models that we should be adding - # in which case we may have to remove the updated_ts filter here and - # instead just avoid creating the event in update_task_model_with_spiff_task - cancelled_spiff_tasks = bpmn_process_instance.get_tasks( - state=TaskState.CANCELLED # , updated_ts=self.run_started_at - ) - for cancelled_spiff_task in cancelled_spiff_tasks: - self.task_service.update_task_model_with_spiff_task( - spiff_task=cancelled_spiff_task, - ) - self.task_service.save_objects_to_database() if self.secondary_engine_step_delegate: