Merge branch 'main' into deploy-app-dev

This commit is contained in:
danfunk 2023-07-06 16:23:14 -04:00
commit 7be729ff6c

View File

@ -52,6 +52,7 @@ from spiffworkflow_backend.services.authorization_service import HumanTaskNotFou
from spiffworkflow_backend.services.authorization_service import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
@ -369,7 +370,7 @@ def _render_instructions_for_end_user(task_model: TaskModel, extensions: dict |
def _interstitial_stream(
process_instance: ProcessInstanceModel, execute_tasks: bool = True
process_instance: ProcessInstanceModel, execute_tasks: bool = True, is_locked: bool = False
) -> Generator[str, str | None, None]:
def get_reportable_tasks() -> Any:
return processor.bpmn_process_instance.get_tasks(
@ -413,19 +414,42 @@ def _interstitial_stream(
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop?
processor.refresh_waiting_tasks()
except WorkflowTaskException as wfe:
api_error = ApiError.from_workflow_exception(
"engine_steps_error", "Failed to complete an automated task.", exp=wfe
)
yield _render_data("error", api_error)
return
if execute_tasks is False:
break
processor.refresh_waiting_tasks()
# path used by the interstitial page while executing tasks - ie the background processor is not executing them
ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance)
if execute_tasks and ready_engine_task_count == 0:
break
if not execute_tasks:
# path used by the process instance show page to display most recent instructions
if not is_locked:
break
# HACK: db.session.refresh doesn't seem to refresh without rollback or commit so use rollback.
# we are not executing tasks so there shouldn't be anything to write anyway, so no harm in rollback.
# https://stackoverflow.com/a/20361132/6090676
# note that the thing changing the data in this case is probably the background worker,
# and it is definitely committing its changes, but since we have already queried the data,
# our session has stale results without the rollback.
db.session.rollback()
db.session.refresh(process_instance)
# if process instance is done or blocked by a human task, then break out
if is_locked and process_instance.status not in ["not_started", "waiting"]:
break
# only get a new processor if we are not executing tasks otherwise we are the ones updating it
processor = ProcessInstanceProcessor(process_instance)
tasks = get_reportable_tasks()
if ready_engine_task_count == 0:
break # No more tasks to report
spiff_task = processor.next_task()
if spiff_task is not None:
@ -454,14 +478,16 @@ def _dequeued_interstitial_stream(
) -> Generator[str | None, str | None, None]:
try:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
ProcessInstanceProcessor(process_instance)
# TODO: currently this just redirects back to home if the process has not been started
# need something better to show?
if execute_tasks:
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance):
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)
try:
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance):
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)
except ProcessInstanceIsAlreadyLockedError:
yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True)
else:
# no reason to get a lock if we are reading only
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)