use last_state_change to figure out if a cancelled task needs an event in the task service instead of with get_tasks and check when manually completing tasks w/ burnettk (#595)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-10-26 16:11:16 -04:00 committed by GitHub
parent eb3fd9e2af
commit 24741b29de
3 changed files with 20 additions and 22 deletions

View File

@ -1617,7 +1617,8 @@ class ProcessInstanceProcessor:
f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}" f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}"
) )
task_model.start_in_seconds = time.time() run_started_at = time.time()
task_model.start_in_seconds = run_started_at
task_exception = None task_exception = None
task_event = ProcessInstanceEventType.task_completed.value task_event = ProcessInstanceEventType.task_completed.value
try: try:
@ -1637,6 +1638,7 @@ class ProcessInstanceProcessor:
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
serializer=self._serializer, serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=run_started_at,
) )
task_service.update_task_model(task_model, spiff_task) task_service.update_task_model(task_model, spiff_task)
JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts) JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts)
@ -1657,7 +1659,13 @@ class ProcessInstanceProcessor:
spiff_task_to_process = spiff_task spiff_task_to_process = spiff_task
if spiff_task_to_process.triggered is True: if spiff_task_to_process.triggered is True:
spiff_task_to_process = spiff_task.parent spiff_task_to_process = spiff_task.parent
task_service.process_parents_and_children_and_save_to_database(spiff_task_to_process)
tasks_to_update = self.bpmn_process_instance.get_tasks(updated_ts=run_started_at)
for spiff_task_to_update in tasks_to_update:
if spiff_task_to_update.id != spiff_task.id:
task_service.update_task_model_with_spiff_task(spiff_task_to_update)
task_service.save_objects_to_database()
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save() self.save()

View File

@ -107,6 +107,7 @@ class TaskService:
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict, bpmn_definition_to_task_definitions_mappings: dict,
run_started_at: float | None = None,
) -> None: ) -> None:
self.process_instance = process_instance self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
@ -117,6 +118,8 @@ class TaskService:
self.json_data_dicts: dict[str, JsonDataDict] = {} self.json_data_dicts: dict[str, JsonDataDict] = {}
self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
self.run_started_at: float | None = run_started_at
def save_objects_to_database(self, save_process_instance_events: bool = True) -> None: def save_objects_to_database(self, save_process_instance_events: bool = True) -> None:
db.session.bulk_save_objects(self.bpmn_processes.values()) db.session.bulk_save_objects(self.bpmn_processes.values())
db.session.bulk_save_objects(self.task_models.values()) db.session.bulk_save_objects(self.task_models.values())
@ -208,8 +211,11 @@ class TaskService:
task_model.start_in_seconds = start_and_end_times["start_in_seconds"] task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
task_model.end_in_seconds = start_and_end_times["end_in_seconds"] task_model.end_in_seconds = start_and_end_times["end_in_seconds"]
# let failed tasks raise and we will log the event then # let failed tasks raise and we will log the event then.
if task_model.state in ["COMPLETED", "CANCELLED"]: # avoid creating events for the same state transition multiple times to avoid multiple cancelled events
if task_model.state in ["COMPLETED", "CANCELLED"] and (
self.run_started_at is None or spiff_task.last_state_change >= self.run_started_at
):
event_type = ProcessInstanceEventType.task_completed.value event_type = ProcessInstanceEventType.task_completed.value
if task_model.state == "CANCELLED": if task_model.state == "CANCELLED":
event_type = ProcessInstanceEventType.task_cancelled.value event_type = ProcessInstanceEventType.task_cancelled.value
@ -438,7 +444,6 @@ class TaskService:
if spiff_task.has_state(TaskState.PREDICTED_MASK): if spiff_task.has_state(TaskState.PREDICTED_MASK):
self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models) self.__class__.remove_spiff_task_from_parent(spiff_task, self.task_models)
continue continue
task_model = TaskModel.query.filter_by(guid=task_id).first() task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None: if task_model is None:
task_model = self.__class__._create_task( task_model = self.__class__._create_task(

View File

@ -201,12 +201,11 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.spiff_tasks_to_process: set[UUID] = set() self.spiff_tasks_to_process: set[UUID] = set()
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {} self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}
self.run_started_at = time.time()
self.task_service = TaskService( self.task_service = TaskService(
process_instance=self.process_instance, process_instance=self.process_instance,
serializer=self.serializer, serializer=self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=time.time(),
) )
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
@ -252,6 +251,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious. # 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious.
for waiting_spiff_task in bpmn_process_instance.get_tasks( for waiting_spiff_task in bpmn_process_instance.get_tasks(
state=TaskState.WAITING state=TaskState.WAITING
| TaskState.CANCELLED
| TaskState.READY | TaskState.READY
| TaskState.MAYBE | TaskState.MAYBE
| TaskState.LIKELY | TaskState.LIKELY
@ -261,21 +261,6 @@ class TaskModelSavingDelegate(EngineStepDelegate):
): ):
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task) self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
# FIXME: this may have broken error boundary events getting cancelled.
# Getting all cancelled tasks to see if that fixes it
#
# only process cancelled tasks that were cancelled during this run
# NOTE: this could mean we do not add task models that we should be adding
# in which case we may have to remove the updated_ts filter here and
# instead just avoid creating the event in update_task_model_with_spiff_task
cancelled_spiff_tasks = bpmn_process_instance.get_tasks(
state=TaskState.CANCELLED # , updated_ts=self.run_started_at
)
for cancelled_spiff_task in cancelled_spiff_tasks:
self.task_service.update_task_model_with_spiff_task(
spiff_task=cancelled_spiff_task,
)
self.task_service.save_objects_to_database() self.task_service.save_objects_to_database()
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate: