run the data migrator from the insterstitial page code when loading the pi show page w/ burnettk (#607)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-11-07 16:20:47 -05:00 committed by GitHub
parent eb13ab843b
commit 57c8112eea
2 changed files with 13 additions and 3 deletions

View File

@ -29,6 +29,8 @@ def benchmark_log_func(func: Any) -> Any:
class ProcessInstanceMigrator: class ProcessInstanceMigrator:
CURRENT_VERSION = "3"
@classmethod @classmethod
def run(cls, process_instance: ProcessInstanceModel) -> None: def run(cls, process_instance: ProcessInstanceModel) -> None:
"""This updates the serialization of an instance to the current expected state. """This updates the serialization of an instance to the current expected state.
@ -41,6 +43,7 @@ class ProcessInstanceMigrator:
# if the serializer version is None, then we are dealing with a new process instance, # if the serializer version is None, then we are dealing with a new process instance,
# so we do not need to run the migrator # so we do not need to run the migrator
# it will be set the newest serializer version momentarily.
if process_instance.spiff_serializer_version is None: if process_instance.spiff_serializer_version is None:
return return

View File

@ -536,7 +536,7 @@ def _interstitial_stream(
execute_tasks: bool = True, execute_tasks: bool = True,
is_locked: bool = False, is_locked: bool = False,
) -> Generator[str, str | None, None]: ) -> Generator[str, str | None, None]:
def get_reportable_tasks() -> Any: def get_reportable_tasks(processor: ProcessInstanceProcessor) -> Any:
return processor.bpmn_process_instance.get_tasks( return processor.bpmn_process_instance.get_tasks(
state=TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR state=TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR
) )
@ -554,7 +554,7 @@ def _interstitial_stream(
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
reported_ids = [] # A list of all the ids reported by this endpoint so far. reported_ids = [] # A list of all the ids reported by this endpoint so far.
tasks = get_reportable_tasks() tasks = get_reportable_tasks(processor)
while True: while True:
has_ready_tasks = False has_ready_tasks = False
for spiff_task in tasks: for spiff_task in tasks:
@ -631,7 +631,7 @@ def _interstitial_stream(
]: ]:
break break
tasks = get_reportable_tasks() tasks = get_reportable_tasks(processor)
spiff_task = processor.next_task() spiff_task = processor.next_task()
if spiff_task is not None: if spiff_task is not None:
@ -672,6 +672,13 @@ def _dequeued_interstitial_stream(
except ProcessInstanceIsAlreadyLockedError: except ProcessInstanceIsAlreadyLockedError:
yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True) yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True)
else: else:
# attempt to run the migrator even for a readonly operation if the process instance is not newest
if process_instance.spiff_serializer_version < ProcessInstanceMigrator.CURRENT_VERSION:
try:
with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
except ProcessInstanceIsAlreadyLockedError:
pass
# no reason to get a lock if we are reading only # no reason to get a lock if we are reading only
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks) yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)
except Exception as ex: except Exception as ex: