Support custom run at time when creating an instance (#270)

This commit is contained in:
jbirddog 2023-05-23 09:52:01 -04:00 committed by GitHub
parent 82c976ff71
commit ca891259f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 70 additions and 19 deletions

View File

@ -70,8 +70,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
"""Start_scheduler."""
scheduler = scheduler_class()
# TODO: polling intervals for different jobs
# TODO: polling intervals for messages job
polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"]
not_started_polling_interval_in_seconds = app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS"
]
user_input_required_polling_interval_in_seconds = app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS"
]
@ -84,6 +87,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
"interval",
seconds=10,
)
scheduler.add_job(
BackgroundProcessingService(app).process_not_started_process_instances,
"interval",
seconds=not_started_polling_interval_in_seconds,
)
scheduler.add_job(
BackgroundProcessingService(app).process_waiting_process_instances,
"interval",

View File

@ -27,6 +27,12 @@ SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int(
default="10",
)
)
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS = int(
environ.get(
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS",
default="30",
)
)
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int(
environ.get(
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS",

View File

@ -124,6 +124,7 @@ def process_instance_run(
processor = None
try:
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
processor = ProcessInstanceService.run_process_instance_with_processor(process_instance)
except (
ApiError,

View File

@ -468,6 +468,10 @@ def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow) -> int:
def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[Optional[str], Optional[str], None]:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
# TODO: currently this just redirects back to home if the process has not been started
# need something better to show?
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance):
yield from _interstitial_stream(process_instance)

View File

@ -18,11 +18,17 @@ class BackgroundProcessingService:
"""__init__."""
self.app = app
def process_not_started_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
ProcessInstanceLockService.set_thread_local_locking_context("bg:notstarted")
ProcessInstanceService.do_waiting(ProcessInstanceStatus.not_started.value)
def process_waiting_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
ProcessInstanceLockService.set_thread_local_locking_context("bg:waiting")
ProcessInstanceService.do_waiting()
ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value)
def process_user_input_required_process_instances(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""

View File

@ -31,8 +31,6 @@ class ProcessInstanceQueueService:
def _configure_and_save_queue_entry(
cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel
) -> None:
# TODO: configurable params (priority/run_at)
queue_entry.run_at_in_seconds = round(time.time())
queue_entry.priority = 2
queue_entry.status = process_instance.status
queue_entry.locked_by = None
@ -42,13 +40,18 @@ class ProcessInstanceQueueService:
db.session.commit()
@classmethod
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel) -> None:
queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id)
def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel, run_at_in_seconds: int) -> None:
queue_entry = ProcessInstanceQueueModel(
process_instance_id=process_instance.id, run_at_in_seconds=run_at_in_seconds
)
cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod
def _enqueue(cls, process_instance: ProcessInstanceModel) -> None:
queue_entry = ProcessInstanceLockService.unlock(process_instance.id)
current_time = round(time.time())
if current_time > queue_entry.run_at_in_seconds:
queue_entry.run_at_in_seconds = current_time
cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod
@ -115,14 +118,16 @@ class ProcessInstanceQueueService:
@classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
status_value: str,
locked_by: Optional[str],
run_at_in_seconds_threshold: int,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold,
)
.all()
)
@ -130,8 +135,23 @@ class ProcessInstanceQueueService:
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
status_value: str,
run_at_in_seconds_threshold: int,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@staticmethod
def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool:
queue_entry = (
db.session.query(ProcessInstanceQueueModel)
.filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id)
.first()
)
if queue_entry is None:
return False
current_time = round(time.time())
return queue_entry.run_at_in_seconds > current_time

View File

@ -77,7 +77,8 @@ class ProcessInstanceService:
)
db.session.add(process_instance_model)
db.session.commit()
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model)
run_at_in_seconds = round(time.time())
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model, run_at_in_seconds)
return process_instance_model
@classmethod
@ -134,9 +135,12 @@ class ProcessInstanceService:
return False
@classmethod
def do_waiting(cls, status_value: str = ProcessInstanceStatus.waiting.value) -> None:
def do_waiting(cls, status_value: str) -> None:
"""Do_waiting."""
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value)
run_at_in_seconds_threshold = round(time.time())
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value, run_at_in_seconds_threshold
)
if len(process_instance_ids_to_check) == 0:
return

View File

@ -305,7 +305,8 @@ class BaseTest:
db.session.add(process_instance)
db.session.commit()
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance)
run_at_in_seconds = round(time.time())
ProcessInstanceQueueService.enqueue_new_process_instance(process_instance, run_at_in_seconds)
return process_instance

View File

@ -1,4 +1,5 @@
"""Test_process_instance_queue_service."""
import time
from contextlib import suppress
from flask.app import Flask
@ -36,7 +37,7 @@ class TestProcessInstanceQueueService(BaseTest):
) -> None:
process_instance = self._create_process_instance()
assert not ProcessInstanceLockService.has_lock(process_instance.id)
queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None)
queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None, round(time.time()))
check_passed = False
for entry in queue_entries:
if entry.process_instance_id == process_instance.id:
@ -51,7 +52,7 @@ class TestProcessInstanceQueueService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
) -> None:
process_instance = self._create_process_instance()
queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started")
queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started", round(time.time()))
assert process_instance.id in queue_entry_ids
def test_can_run_some_code_with_a_dequeued_process_instance(