Optimisticly skip locking/background processing (#190)
This commit is contained in:
parent
012b43dd72
commit
992648f087
|
@ -69,6 +69,9 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
|
||||||
|
|
||||||
# TODO: polling intervals for different jobs
|
# TODO: polling intervals for different jobs
|
||||||
polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"]
|
polling_interval_in_seconds = app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS"]
|
||||||
|
user_input_required_polling_interval_in_seconds = app.config[
|
||||||
|
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS"
|
||||||
|
]
|
||||||
# TODO: add job to release locks to simplify other queries
|
# TODO: add job to release locks to simplify other queries
|
||||||
# TODO: add job to delete completed entires
|
# TODO: add job to delete completed entires
|
||||||
# TODO: add job to run old/low priority instances so they do not get drowned out
|
# TODO: add job to run old/low priority instances so they do not get drowned out
|
||||||
|
@ -86,7 +89,7 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
BackgroundProcessingService(app).process_user_input_required_process_instances,
|
BackgroundProcessingService(app).process_user_input_required_process_instances,
|
||||||
"interval",
|
"interval",
|
||||||
seconds=120,
|
seconds=user_input_required_polling_interval_in_seconds,
|
||||||
)
|
)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,21 @@ SPIFFWORKFLOW_BACKEND_CORS_ALLOW_ORIGINS = re.split(
|
||||||
SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER = (
|
SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER = (
|
||||||
environ.get("SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER", default="false") == "true"
|
environ.get("SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER", default="false") == "true"
|
||||||
)
|
)
|
||||||
|
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS = (
|
||||||
|
environ.get("SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS", default="true") == "true"
|
||||||
|
)
|
||||||
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int(
|
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS = int(
|
||||||
environ.get(
|
environ.get(
|
||||||
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS",
|
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_POLLING_INTERVAL_IN_SECONDS",
|
||||||
default="10",
|
default="10",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS = int(
|
||||||
|
environ.get(
|
||||||
|
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_USER_INPUT_REQUIRED_POLLING_INTERVAL_IN_SECONDS",
|
||||||
|
default="120",
|
||||||
|
)
|
||||||
|
)
|
||||||
SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND = environ.get(
|
SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND = environ.get(
|
||||||
"SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND", default="http://localhost:7001"
|
"SPIFFWORKFLOW_BACKEND_URL_FOR_FRONTEND", default="http://localhost:7001"
|
||||||
)
|
)
|
||||||
|
|
|
@ -11,6 +11,7 @@ from urllib.parse import unquote
|
||||||
|
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
from SpiffWorkflow.bpmn.specs.events.IntermediateEvent import _BoundaryEventParent # type: ignore
|
||||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||||
|
|
||||||
from spiffworkflow_backend import db
|
from spiffworkflow_backend import db
|
||||||
|
@ -81,8 +82,25 @@ class ProcessInstanceService:
|
||||||
process_model = ProcessModelService.get_process_model(process_model_identifier)
|
process_model = ProcessModelService.get_process_model(process_model_identifier)
|
||||||
return cls.create_process_instance(process_model, user)
|
return cls.create_process_instance(process_model, user)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
|
def ready_user_task_has_associated_timer(cls, processor: ProcessInstanceProcessor) -> bool:
|
||||||
|
for ready_user_task in processor.bpmn_process_instance.get_ready_user_tasks():
|
||||||
|
if isinstance(ready_user_task.parent.task_spec, _BoundaryEventParent):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def can_optimistically_skip(cls, processor: ProcessInstanceProcessor, status_value: str) -> bool:
|
||||||
|
if not current_app.config["SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_ALLOW_OPTIMISTIC_CHECKS"]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if processor.process_instance_model.status != status_value:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return status_value == "user_input_required" and not cls.ready_user_task_has_associated_timer(processor)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def do_waiting(cls, status_value: str = ProcessInstanceStatus.waiting.value) -> None:
|
||||||
"""Do_waiting."""
|
"""Do_waiting."""
|
||||||
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value)
|
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(status_value)
|
||||||
if len(process_instance_ids_to_check) == 0:
|
if len(process_instance_ids_to_check) == 0:
|
||||||
|
@ -100,6 +118,10 @@ class ProcessInstanceService:
|
||||||
try:
|
try:
|
||||||
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
if cls.can_optimistically_skip(processor, status_value):
|
||||||
|
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
||||||
|
continue
|
||||||
|
|
||||||
processor.lock_process_instance(process_instance_lock_prefix)
|
processor.lock_process_instance(process_instance_lock_prefix)
|
||||||
locked = True
|
locked = True
|
||||||
db.session.refresh(process_instance)
|
db.session.refresh(process_instance)
|
||||||
|
|
Loading…
Reference in New Issue