diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py index 3e2191c8a..3d216dc6c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py @@ -69,6 +69,9 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg # TODO: polling intervals for different jobs polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"] + user_input_required_polling_interval_in_seconds = app.config[ + "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS" + ] # TODO: add job to release locks to simplify other queries # TODO: add job to delete completed entires # TODO: add job to run old/low priority instances so they do not get drowned out @@ -86,7 +89,7 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg scheduler.add_job( BackgroundProcessingService(app).process_user_input_required_process_instances, "interval", - seconds=120, + seconds=user_input_required_polling_interval_in_seconds, ) scheduler.start() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 2af3e7df0..1805e8af2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -18,12 +18,21 @@ SPIFFWORKFLOW_BACKEND_CORS_ALLOW_ORIGINS = re.split( SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER = ( environ.get("SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER", default="false") == "true" ) +SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS = ( + environ.get("SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS", default="true") == "true" +) SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int( environ.get( "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS", default="10", ) ) +SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int( + environ.get( + "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS", + default="120", + ) +) SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND = environ.get( "SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND", default="http://localhost:7001" ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 45e83d7c6..4daabd588 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -11,6 +11,7 @@ from urllib.parse import unquote import sentry_sdk from flask import current_app +from SpiffWorkflow.bpmn.specs.events.IntermediateEvent import _BoundaryEventParent # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from spiffworkflow_backend import db @@ -81,8 +82,25 @@ class ProcessInstanceService: process_model = ProcessModelService.get_process_model(process_model_identifier) return cls.create_process_instance(process_model, user) - @staticmethod - def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None: + @classmethod + def ready_user_task_has_associated_timer(cls, processor: ProcessInstanceProcessor) -> bool: + for ready_user_task in processor.bpmn_process_instance.get_ready_user_tasks(): + if isinstance(ready_user_task.parent.task_spec, _BoundaryEventParent): + return True + return False + + @classmethod + def can_optimistically_skip(cls, processor: ProcessInstanceProcessor, status_value: str) -> bool: + if not current_app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS"]: + return False + + if processor.process_instance_model.status != status_value: + return True + + return status_value == "user_input_required" and not cls.ready_user_task_has_associated_timer(processor) + + @classmethod + def do_waiting(cls, status_value: str = ProcessInstanceStatus.waiting.value) -> None: """Do_waiting.""" process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value) if len(process_instance_ids_to_check) == 0: @@ -100,6 +118,10 @@ class ProcessInstanceService: try: current_app.logger.info(f"Processing process_instance {process_instance.id}") processor = ProcessInstanceProcessor(process_instance) + if cls.can_optimistically_skip(processor, status_value): + current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") + continue + processor.lock_process_instance(process_instance_lock_prefix) locked = True db.session.refresh(process_instance)