diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py index cf49d41cc..343f93dbd 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py @@ -29,6 +29,8 @@ def benchmark_log_func(func: Any) -> Any: class ProcessInstanceMigrator: + CURRENT_VERSION = "3" + @classmethod def run(cls, process_instance: ProcessInstanceModel) -> None: """This updates the serialization of an instance to the current expected state. @@ -41,6 +43,7 @@ class ProcessInstanceMigrator: # if the serializer version is None, then we are dealing with a new process instance, # so we do not need to run the migrator + # it will be set the newest serializer version momentarily. if process_instance.spiff_serializer_version is None: return diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 8b25d04fd..837ed9c26 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -536,7 +536,7 @@ def _interstitial_stream( execute_tasks: bool = True, is_locked: bool = False, ) -> Generator[str, str | None, None]: - def get_reportable_tasks() -> Any: + def get_reportable_tasks(processor: ProcessInstanceProcessor) -> Any: return processor.bpmn_process_instance.get_tasks( state=TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR ) @@ -554,7 +554,7 @@ def _interstitial_stream( processor = ProcessInstanceProcessor(process_instance) reported_ids = [] # A list of all the ids reported by this endpoint so far. - tasks = get_reportable_tasks() + tasks = get_reportable_tasks(processor) while True: has_ready_tasks = False for spiff_task in tasks: @@ -631,7 +631,7 @@ def _interstitial_stream( ]: break - tasks = get_reportable_tasks() + tasks = get_reportable_tasks(processor) spiff_task = processor.next_task() if spiff_task is not None: @@ -672,6 +672,13 @@ def _dequeued_interstitial_stream( except ProcessInstanceIsAlreadyLockedError: yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True) else: + # attempt to run the migrator even for a readonly operation if the process instance is not newest + if process_instance.spiff_serializer_version < ProcessInstanceMigrator.CURRENT_VERSION: + try: + with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) + except ProcessInstanceIsAlreadyLockedError: + pass # no reason to get a lock if we are reading only yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks) except Exception as ex: