Support custom run at time when creating an instance (#270)

This commit is contained in:
jbirddog 2023-05-23 09:52:01 -04:00 committed by GitHub
parent ba81019257
commit 9ec956cee8
9 changed files with 70 additions and 19 deletions

View File

@ -70,8 +70,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
"""Start_scheduler.""" """Start_scheduler."""
scheduler = scheduler_class() scheduler = scheduler_class()
# TODO: polling intervals for different jobs # TODO: polling intervals for messages job
polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"] polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"]
not_started_polling_interval_in_seconds = app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS"
]
user_input_required_polling_interval_in_seconds = app.config[ user_input_required_polling_interval_in_seconds = app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS" "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS"
] ]
@ -84,6 +87,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
"interval", "interval",
seconds=10, seconds=10,
) )
scheduler.add_job(
BackgroundProcessingService(app).process_not_started_process_instances,
"interval",
seconds=not_started_polling_interval_in_seconds,
)
scheduler.add_job( scheduler.add_job(
BackgroundProcessingService(app).process_waiting_process_instances, BackgroundProcessingService(app).process_waiting_process_instances,
"interval", "interval",

View File

@ -27,6 +27,12 @@ SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int(
default="10", default="10",
) )
) )
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS = int(
environ.get(
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS",
default="30",
)
)
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int( SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int(
environ.get( environ.get(
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS", "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS",

View File

@ -124,6 +124,7 @@ def process_instance_run(
processor = None processor = None
try: try:
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
processor = ProcessInstanceService.run_process_instance_with_processor(process_instance) processor = ProcessInstanceService.run_process_instance_with_processor(process_instance)
except ( except (
ApiError, ApiError,

View File

@ -468,6 +468,10 @@ def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow) -> int:
def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[Optional[str], Optional[str], None]: def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[Optional[str], Optional[str], None]:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
# TODO: currently this just redirects back to home if the process has not been started
# need something better to show?
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
yield from _interstitial_stream(process_instance) yield from _interstitial_stream(process_instance)

View File

@ -18,11 +18,17 @@ class BackgroundProcessingService:
"""__init__.""" """__init__."""
self.app = app self.app = app
def process_not_started_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
ProcessInstanceLockService.set_thread_local_locking_context("bg:notstarted")
ProcessInstanceService.do_waiting(ProcessInstanceStatus.not_started.value)
def process_waiting_process_instances(self) -> None: def process_waiting_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well.""" """Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context(): with self.app.app_context():
ProcessInstanceLockService.set_thread_local_locking_context("bg:waiting") ProcessInstanceLockService.set_thread_local_locking_context("bg:waiting")
ProcessInstanceService.do_waiting() ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value)
def process_user_input_required_process_instances(self) -> None: def process_user_input_required_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well.""" """Since this runs in a scheduler, we need to specify the app context as well."""

View File

@ -31,8 +31,6 @@ class ProcessInstanceQueueService:
def _configure_and_save_queue_entry( def _configure_and_save_queue_entry(
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
) -> None: ) -> None:
# TODO: configurable params (priority/run_at)
queue_entry.run_at_in_seconds = round(time.time())
queue_entry.priority = 2 queue_entry.priority = 2
queue_entry.status = process_instance.status queue_entry.status = process_instance.status
queue_entry.locked_by = None queue_entry.locked_by = None
@ -42,13 +40,18 @@ class ProcessInstanceQueueService:
db.session.commit() db.session.commit()
@classmethod @classmethod
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel) -> None: def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel, run_at_in_seconds: int) -> None:
queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id) queue_entry = ProcessInstanceQueueModel(
process_instance_id=process_instance.id, run_at_in_seconds=run_at_in_seconds
)
cls._configure_and_save_queue_entry(process_instance, queue_entry) cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod @classmethod
def _enqueue(cls, process_instance: ProcessInstanceModel) -> None: def _enqueue(cls, process_instance: ProcessInstanceModel) -> None:
queue_entry = ProcessInstanceLockService.unlock(process_instance.id) queue_entry = ProcessInstanceLockService.unlock(process_instance.id)
current_time = round(time.time())
if current_time > queue_entry.run_at_in_seconds:
queue_entry.run_at_in_seconds = current_time
cls._configure_and_save_queue_entry(process_instance, queue_entry) cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod @classmethod
@ -115,14 +118,16 @@ class ProcessInstanceQueueService:
@classmethod @classmethod
def entries_with_status( def entries_with_status(
cls, cls,
status_value: str = ProcessInstanceStatus.waiting.value, status_value: str,
locked_by: Optional[str] = None, locked_by: Optional[str],
run_at_in_seconds_threshold: int,
) -> List[ProcessInstanceQueueModel]: ) -> List[ProcessInstanceQueueModel]:
return ( return (
db.session.query(ProcessInstanceQueueModel) db.session.query(ProcessInstanceQueueModel)
.filter( .filter(
ProcessInstanceQueueModel.status == status_value, ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by, ProcessInstanceQueueModel.locked_by == locked_by,
ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold,
) )
.all() .all()
) )
@ -130,8 +135,23 @@ class ProcessInstanceQueueService:
@classmethod @classmethod
def peek_many( def peek_many(
cls, cls,
status_value: str = ProcessInstanceStatus.waiting.value, status_value: str,
run_at_in_seconds_threshold: int,
) -> List[int]: ) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None) queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold)
ids_with_status = [entry.process_instance_id for entry in queue_entries] ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status return ids_with_status
@staticmethod
def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool:
queue_entry = (
db.session.query(ProcessInstanceQueueModel)
.filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id)
.first()
)
if queue_entry is None:
return False
current_time = round(time.time())
return queue_entry.run_at_in_seconds > current_time

View File

@ -77,7 +77,8 @@ class ProcessInstanceService:
) )
db.session.add(process_instance_model) db.session.add(process_instance_model)
db.session.commit() db.session.commit()
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model) run_at_in_seconds = round(time.time())
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds)
return process_instance_model return process_instance_model
@classmethod @classmethod
@ -134,9 +135,12 @@ class ProcessInstanceService:
return False return False
@classmethod @classmethod
def do_waiting(cls, status_value: str = ProcessInstanceStatus.waiting.value) -> None: def do_waiting(cls, status_value: str) -> None:
"""Do_waiting.""" """Do_waiting."""
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value) run_at_in_seconds_threshold = round(time.time())
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value, run_at_in_seconds_threshold
)
if len(process_instance_ids_to_check) == 0: if len(process_instance_ids_to_check) == 0:
return return

View File

@ -305,7 +305,8 @@ class BaseTest:
db.session.add(process_instance) db.session.add(process_instance)
db.session.commit() db.session.commit()
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance) run_at_in_seconds = round(time.time())
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds)
return process_instance return process_instance

View File

@ -1,4 +1,5 @@
"""Test_process_instance_queue_service.""" """Test_process_instance_queue_service."""
import time
from contextlib import suppress from contextlib import suppress
from flask.app import Flask from flask.app import Flask
@ -36,7 +37,7 @@ class TestProcessInstanceQueueService(BaseTest):
) -> None: ) -> None:
process_instance = self._create_process_instance() process_instance = self._create_process_instance()
assert not ProcessInstanceLockService.has_lock(process_instance.id) assert not ProcessInstanceLockService.has_lock(process_instance.id)
queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None) queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None, round(time.time()))
check_passed = False check_passed = False
for entry in queue_entries: for entry in queue_entries:
if entry.process_instance_id == process_instance.id: if entry.process_instance_id == process_instance.id:
@ -51,7 +52,7 @@ class TestProcessInstanceQueueService(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
process_instance = self._create_process_instance() process_instance = self._create_process_instance()
queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started") queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started", round(time.time()))
assert process_instance.id in queue_entry_ids assert process_instance.id in queue_entry_ids
def test_can_run_some_code_with_a_dequeued_process_instance( def test_can_run_some_code_with_a_dequeued_process_instance(