diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml index d6375227..b1814168 100755 --- a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml +++ b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml @@ -1261,6 +1261,15 @@ paths: description: Force the process instance to run even if it has already been started. schema: type: boolean + - name: execution_mode + in: query + required: false + description: Either run in "synchronous" or "asynchronous" mode. + schema: + type: string + enum: + - synchronous + - asynchronous post: operationId: spiffworkflow_backend.routes.process_instances_controller.process_instance_run summary: Run a process instance @@ -2356,6 +2365,15 @@ paths: description: Include task data for forms schema: type: boolean + - name: execution_mode + in: query + required: false + description: Either run in "synchronous" or "asynchronous" mode. + schema: + type: string + enum: + - synchronous + - asynchronous get: tags: - Tasks diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 0a585b20..095827e2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -4,6 +4,8 @@ import celery from flask import current_app from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN +from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.process_instance import ProcessInstanceModel @@ -35,9 +37,23 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. -def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel) -> bool: - if queue_enabled_for_process_model(process_instance): +def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: + # check if the enum value is valid + if execution_mode: + ProcessInstanceExecutionMode(execution_mode) + + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + return False + + queue_enabled = queue_enabled_for_process_model(process_instance) + if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled: + raise ApiError( + error_code="async_mode_called_without_celery", + message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.", + status_code=400, + ) + + if queue_enabled: celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) return True - return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/helpers/spiff_enum.py b/spiffworkflow-backend/src/spiffworkflow_backend/helpers/spiff_enum.py index 0fe75b0c..90436b99 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/helpers/spiff_enum.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/helpers/spiff_enum.py @@ -5,3 +5,8 @@ class SpiffEnum(enum.Enum): @classmethod def list(cls) -> list[str]: return [el.value for el in cls] + + +class ProcessInstanceExecutionMode(SpiffEnum): + asynchronous = "asynchronous" + synchronous = "synchronous" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 6b8eb212..13910168 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -1,3 +1,5 @@ +from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode + # black and ruff are in competition with each other in import formatting so ignore ruff # ruff: noqa: I001 @@ -74,9 +76,10 @@ def process_instance_run( modified_process_model_identifier: str, process_instance_id: int, force_run: bool = False, + execution_mode: str | None = None, ) -> flask.wrappers.Response: process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - _process_instance_run(process_instance, force_run=force_run) + _process_instance_run(process_instance, force_run=force_run, execution_mode=execution_mode) process_instance_api = ProcessInstanceService.processor_to_process_instance_api(process_instance) process_instance_api_dict = ProcessInstanceApiSchema().dump(process_instance_api) @@ -644,6 +647,7 @@ def _get_process_instance( def _process_instance_run( process_instance: ProcessInstanceModel, force_run: bool = False, + execution_mode: str | None = None, ) -> None: if process_instance.status != "not_started" and not force_run: raise ApiError( @@ -654,10 +658,15 @@ def _process_instance_run( processor = None try: - if queue_enabled_for_process_model(process_instance): - queue_process_instance_if_appropriate(process_instance) - elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): - processor, _ = ProcessInstanceService.run_process_instance_with_processor(process_instance) + if not queue_process_instance_if_appropriate( + process_instance, execution_mode=execution_mode + ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" + processor, _ = ProcessInstanceService.run_process_instance_with_processor( + process_instance, execution_strategy_name=execution_strategy_name + ) except ( ApiError, ProcessInstanceIsNotEnqueuedError, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 6a4b690c..46324206 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -557,9 +557,10 @@ def task_submit( process_instance_id: int, task_guid: str, body: dict[str, Any], + execution_mode: str | None = None, ) -> flask.wrappers.Response: with sentry_sdk.start_span(op="controller_action", description="tasks_controller.task_submit"): - return _task_submit_shared(process_instance_id, task_guid, body) + return _task_submit_shared(process_instance_id, task_guid, body, execution_mode=execution_mode) def process_instance_progress( @@ -875,6 +876,7 @@ def _task_submit_shared( process_instance_id: int, task_guid: str, body: dict[str, Any], + execution_mode: str | None = None, ) -> flask.wrappers.Response: principal = _find_principal_or_raise() process_instance = _find_process_instance_by_id_or_raise(process_instance_id) @@ -925,6 +927,7 @@ def _task_submit_shared( data=body, user=g.user, human_task=human_task, + execution_mode=execution_mode, ) # currently task_model has the potential to be None. This should be removable once diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 649f5b46..07fa90fc 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -126,6 +126,7 @@ SPIFF_CONFIG[JSONFileDataStore] = JSONFileDataStoreConverter SPIFF_CONFIG[KKVDataStore] = KKVDataStoreConverter SPIFF_CONFIG[TypeaheadDataStore] = TypeaheadDataStoreConverter + # Sorry about all this crap. I wanted to move this thing to another file, but # importing a bunch of types causes circular imports. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 240da1cf..6c9bfd55 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -19,9 +19,6 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.task import TaskState # type: ignore -from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( - queue_enabled_for_process_model, -) from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_process_instance_if_appropriate, ) @@ -30,6 +27,7 @@ from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError from spiffworkflow_backend.exceptions.error import UserDoesNotHaveAccessToTaskError +from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.human_task import HumanTaskModel @@ -464,13 +462,15 @@ class ProcessInstanceService: ) DeepMerge.merge(spiff_task.data, data) - @staticmethod + @classmethod def complete_form_task( + cls, processor: ProcessInstanceProcessor, spiff_task: SpiffTask, data: dict[str, Any], user: UserModel, human_task: HumanTaskModel, + execution_mode: str | None = None, ) -> None: """All the things that need to happen when we complete a form. @@ -481,12 +481,16 @@ class ProcessInstanceService: # ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store. processor.complete_task(spiff_task, human_task, user=user) - if queue_enabled_for_process_model(processor.process_instance_model): - queue_process_instance_if_appropriate(processor.process_instance_model) - else: + if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode): + return + elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" + # maybe move this out once we have the interstitial page since this is here just so we can get the next human task - processor.do_engine_steps(save=True) + processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) @staticmethod def spiff_task_to_api_task(