diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py index ce8a3970b..54305cd99 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py @@ -70,8 +70,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg """Start_scheduler.""" scheduler = scheduler_class() - # TODO: polling intervals for different jobs + # TODO: polling intervals for messages job polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"] + not_started_polling_interval_in_seconds = app.config[ + "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS" + ] user_input_required_polling_interval_in_seconds = app.config[ "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS" ] @@ -84,6 +87,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg "interval", seconds=10, ) + scheduler.add_job( + BackgroundProcessingService(app).process_not_started_process_instances, + "interval", + seconds=not_started_polling_interval_in_seconds, + ) scheduler.add_job( BackgroundProcessingService(app).process_waiting_process_instances, "interval", diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 08187f7ea..25cbbab2e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -27,6 +27,12 @@ SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int( default="10", ) ) +SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS = int( + environ.get( + "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS", + default="30", + ) +) SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int( environ.get( "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS", diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 8eb08a214..647b1357c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -124,7 +124,8 @@ def process_instance_run( processor = None try: - processor = ProcessInstanceService.run_process_instance_with_processor(process_instance) + if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): + processor = ProcessInstanceService.run_process_instance_with_processor(process_instance) except ( ApiError, ProcessInstanceIsNotEnqueuedError, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 2b68942dd..5211f02de 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -468,8 +468,12 @@ def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow) -> int: def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[Optional[str], Optional[str], None]: process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - with ProcessInstanceQueueService.dequeued(process_instance): - yield from _interstitial_stream(process_instance) + # TODO: currently this just redirects back to home if the process has not been started + # need something better to show? + + if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): + with ProcessInstanceQueueService.dequeued(process_instance): + yield from _interstitial_stream(process_instance) def interstitial(process_instance_id: int) -> Response: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py index eabff0f9d..38cb0c64a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py @@ -18,11 +18,17 @@ class BackgroundProcessingService: """__init__.""" self.app = app + def process_not_started_process_instances(self) -> None: + """Since this runs in a scheduler, we need to specify the app context as well.""" + with self.app.app_context(): + ProcessInstanceLockService.set_thread_local_locking_context("bg:notstarted") + ProcessInstanceService.do_waiting(ProcessInstanceStatus.not_started.value) + def process_waiting_process_instances(self) -> None: """Since this runs in a scheduler, we need to specify the app context as well.""" with self.app.app_context(): ProcessInstanceLockService.set_thread_local_locking_context("bg:waiting") - ProcessInstanceService.do_waiting() + ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value) def process_user_input_required_process_instances(self) -> None: """Since this runs in a scheduler, we need to specify the app context as well.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index ff6e158fa..4e65167d7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -31,8 +31,6 @@ class ProcessInstanceQueueService: def _configure_and_save_queue_entry( cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel ) -> None: - # TODO: configurable params (priority/run_at) - queue_entry.run_at_in_seconds = round(time.time()) queue_entry.priority = 2 queue_entry.status = process_instance.status queue_entry.locked_by = None @@ -42,13 +40,18 @@ class ProcessInstanceQueueService: db.session.commit() @classmethod - def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel) -> None: - queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id) + def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel, run_at_in_seconds: int) -> None: + queue_entry = ProcessInstanceQueueModel( + process_instance_id=process_instance.id, run_at_in_seconds=run_at_in_seconds + ) cls._configure_and_save_queue_entry(process_instance, queue_entry) @classmethod def _enqueue(cls, process_instance: ProcessInstanceModel) -> None: queue_entry = ProcessInstanceLockService.unlock(process_instance.id) + current_time = round(time.time()) + if current_time > queue_entry.run_at_in_seconds: + queue_entry.run_at_in_seconds = current_time cls._configure_and_save_queue_entry(process_instance, queue_entry) @classmethod @@ -115,14 +118,16 @@ class ProcessInstanceQueueService: @classmethod def entries_with_status( cls, - status_value: str = ProcessInstanceStatus.waiting.value, - locked_by: Optional[str] = None, + status_value: str, + locked_by: Optional[str], + run_at_in_seconds_threshold: int, ) -> List[ProcessInstanceQueueModel]: return ( db.session.query(ProcessInstanceQueueModel) .filter( ProcessInstanceQueueModel.status == status_value, ProcessInstanceQueueModel.locked_by == locked_by, + ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold, ) .all() ) @@ -130,8 +135,23 @@ class ProcessInstanceQueueService: @classmethod def peek_many( cls, - status_value: str = ProcessInstanceStatus.waiting.value, + status_value: str, + run_at_in_seconds_threshold: int, ) -> List[int]: - queue_entries = cls.entries_with_status(status_value, None) + queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold) ids_with_status = [entry.process_instance_id for entry in queue_entries] return ids_with_status + + @staticmethod + def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool: + queue_entry = ( + db.session.query(ProcessInstanceQueueModel) + .filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id) + .first() + ) + + if queue_entry is None: + return False + + current_time = round(time.time()) + return queue_entry.run_at_in_seconds > current_time diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 84c62f12c..0bded4b44 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -77,7 +77,8 @@ class ProcessInstanceService: ) db.session.add(process_instance_model) db.session.commit() - ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model) + run_at_in_seconds = round(time.time()) + ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds) return process_instance_model @classmethod @@ -134,9 +135,12 @@ class ProcessInstanceService: return False @classmethod - def do_waiting(cls, status_value: str = ProcessInstanceStatus.waiting.value) -> None: + def do_waiting(cls, status_value: str) -> None: """Do_waiting.""" - process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value) + run_at_in_seconds_threshold = round(time.time()) + process_instance_ids_to_check = ProcessInstanceQueueService.peek_many( + status_value, run_at_in_seconds_threshold + ) if len(process_instance_ids_to_check) == 0: return diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py index c2768d2d4..487fb0c76 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py @@ -305,7 +305,8 @@ class BaseTest: db.session.add(process_instance) db.session.commit() - ProcessInstanceQueueService.enqueue_new_process_instance(process_instance) + run_at_in_seconds = round(time.time()) + ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds) return process_instance diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py index f676479ff..0c455cc54 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py @@ -1,4 +1,5 @@ """Test_process_instance_queue_service.""" +import time from contextlib import suppress from flask.app import Flask @@ -36,7 +37,7 @@ class TestProcessInstanceQueueService(BaseTest): ) -> None: process_instance = self._create_process_instance() assert not ProcessInstanceLockService.has_lock(process_instance.id) - queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None) + queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None, round(time.time())) check_passed = False for entry in queue_entries: if entry.process_instance_id == process_instance.id: @@ -51,7 +52,7 @@ class TestProcessInstanceQueueService(BaseTest): with_db_and_bpmn_file_cleanup: None, ) -> None: process_instance = self._create_process_instance() - queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started") + queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started", round(time.time())) assert process_instance.id in queue_entry_ids def test_can_run_some_code_with_a_dequeued_process_instance(