Feature/async support on pi run (#1161)

* added execution mode to task submit and pi run calls to run in async or sync mode w/ burnettk

* do not allow requesting async mode without celery w/ burnettk

* attempt to move queue checking for celery and async to same method to avoid confusing methods w/ burnettk

* if requesting synchronous mode then run with greedy as well w/ burnettk

* implemented some coderabbit suggestions w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-05 11:20:54 -05:00 committed by GitHub
parent 966359b125
commit 2be7db73df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 73 additions and 17 deletions

View File

@ -1261,6 +1261,15 @@ paths:
description: Force the process instance to run even if it has already been started.
schema:
type: boolean
- name: execution_mode
in: query
required: false
description: Either run in "synchronous" or "asynchronous" mode.
schema:
type: string
enum:
- synchronous
- asynchronous
post:
operationId: spiffworkflow_backend.routes.process_instances_controller.process_instance_run
summary: Run a process instance
@ -2356,6 +2365,15 @@ paths:
description: Include task data for forms
schema:
type: boolean
- name: execution_mode
in: query
required: false
description: Either run in "synchronous" or "asynchronous" mode.
schema:
type: string
enum:
- synchronous
- asynchronous
get:
tags:
- Tasks

View File

@ -4,6 +4,8 @@ import celery
from flask import current_app
from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -35,9 +37,23 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
# if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable.
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel) -> bool:
if queue_enabled_for_process_model(process_instance):
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
# check if the enum value is valid
if execution_mode:
ProcessInstanceExecutionMode(execution_mode)
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
return False
queue_enabled = queue_enabled_for_process_model(process_instance)
if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled:
raise ApiError(
error_code="async_mode_called_without_celery",
message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.",
status_code=400,
)
if queue_enabled:
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
return True
return False

View File

@ -5,3 +5,8 @@ class SpiffEnum(enum.Enum):
@classmethod
def list(cls) -> list[str]:
return [el.value for el in cls]
class ProcessInstanceExecutionMode(SpiffEnum):
asynchronous = "asynchronous"
synchronous = "synchronous"

View File

@ -1,3 +1,5 @@
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
# black and ruff are in competition with each other in import formatting so ignore ruff
# ruff: noqa: I001
@ -74,9 +76,10 @@ def process_instance_run(
modified_process_model_identifier: str,
process_instance_id: int,
force_run: bool = False,
execution_mode: str | None = None,
) -> flask.wrappers.Response:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
_process_instance_run(process_instance, force_run=force_run)
_process_instance_run(process_instance, force_run=force_run, execution_mode=execution_mode)
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(process_instance)
process_instance_api_dict = ProcessInstanceApiSchema().dump(process_instance_api)
@ -644,6 +647,7 @@ def _get_process_instance(
def _process_instance_run(
process_instance: ProcessInstanceModel,
force_run: bool = False,
execution_mode: str | None = None,
) -> None:
if process_instance.status != "not_started" and not force_run:
raise ApiError(
@ -654,10 +658,15 @@ def _process_instance_run(
processor = None
try:
if queue_enabled_for_process_model(process_instance):
queue_process_instance_if_appropriate(process_instance)
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
processor, _ = ProcessInstanceService.run_process_instance_with_processor(process_instance)
if not queue_process_instance_if_appropriate(
process_instance, execution_mode=execution_mode
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor, _ = ProcessInstanceService.run_process_instance_with_processor(
process_instance, execution_strategy_name=execution_strategy_name
)
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,

View File

@ -557,9 +557,10 @@ def task_submit(
process_instance_id: int,
task_guid: str,
body: dict[str, Any],
execution_mode: str | None = None,
) -> flask.wrappers.Response:
with sentry_sdk.start_span(op="controller_action", description="tasks_controller.task_submit"):
return _task_submit_shared(process_instance_id, task_guid, body)
return _task_submit_shared(process_instance_id, task_guid, body, execution_mode=execution_mode)
def process_instance_progress(
@ -875,6 +876,7 @@ def _task_submit_shared(
process_instance_id: int,
task_guid: str,
body: dict[str, Any],
execution_mode: str | None = None,
) -> flask.wrappers.Response:
principal = _find_principal_or_raise()
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -925,6 +927,7 @@ def _task_submit_shared(
data=body,
user=g.user,
human_task=human_task,
execution_mode=execution_mode,
)
# currently task_model has the potential to be None. This should be removable once

View File

@ -126,6 +126,7 @@ SPIFF_CONFIG[JSONFileDataStore] = JSONFileDataStoreConverter
SPIFF_CONFIG[KKVDataStore] = KKVDataStoreConverter
SPIFF_CONFIG[TypeaheadDataStore] = TypeaheadDataStoreConverter
# Sorry about all this crap. I wanted to move this thing to another file, but
# importing a bunch of types causes circular imports.

View File

@ -19,9 +19,6 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from SpiffWorkflow.util.task import TaskState # type: ignore
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_enabled_for_process_model,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
@ -30,6 +27,7 @@ from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError
from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError
from spiffworkflow_backend.exceptions.error import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
@ -464,13 +462,15 @@ class ProcessInstanceService:
)
DeepMerge.merge(spiff_task.data, data)
@staticmethod
@classmethod
def complete_form_task(
cls,
processor: ProcessInstanceProcessor,
spiff_task: SpiffTask,
data: dict[str, Any],
user: UserModel,
human_task: HumanTaskModel,
execution_mode: str | None = None,
) -> None:
"""All the things that need to happen when we complete a form.
@ -481,12 +481,16 @@ class ProcessInstanceService:
# ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store.
processor.complete_task(spiff_task, human_task, user=user)
if queue_enabled_for_process_model(processor.process_instance_model):
queue_process_instance_if_appropriate(processor.process_instance_model)
else:
if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode):
return
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
# maybe move this out once we have the interstitial page since this is here just so we can get the next human task
processor.do_engine_steps(save=True)
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
@staticmethod
def spiff_task_to_api_task(