make sure we have a current processor so we don't return null (#379)

* make sure we have a current processor so we don't return null

* remove sleep

* The background processor now only picks up processes that were last updated more than a minute ago to avoid conflicting with the interstitial page.  With the understanding that we can rmeove this limitation when we can refactor to allow the backend processes to provide updates on what they are doing.

* pyl w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
Dan Funk 2023-07-07 15:49:52 -04:00 committed by GitHub
parent 9df4614e7b
commit 358f867a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 5 deletions

View File

@ -441,14 +441,12 @@ def _interstitial_stream(
# our session has stale results without the rollback.
db.session.rollback()
db.session.refresh(process_instance)
processor = ProcessInstanceProcessor(process_instance)
# if process instance is done or blocked by a human task, then break out
if is_locked and process_instance.status not in ["not_started", "waiting"]:
break
# only get a new processor if we are not executing tasks otherwise we are the ones updating it
processor = ProcessInstanceProcessor(process_instance)
tasks = get_reportable_tasks()
spiff_task = processor.next_task()

View File

@ -115,11 +115,14 @@ class ProcessInstanceQueueService:
status_value: str,
locked_by: str | None,
run_at_in_seconds_threshold: int,
min_age_in_seconds: int = 0,
) -> list[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.updated_at_in_seconds <= round(time.time()) - min_age_in_seconds,
# At least a minute old.
ProcessInstanceQueueModel.locked_by == locked_by,
ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold,
)
@ -131,8 +134,9 @@ class ProcessInstanceQueueService:
cls,
status_value: str,
run_at_in_seconds_threshold: int,
min_age_in_seconds: int = 0,
) -> list[int]:
queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold)
queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold, min_age_in_seconds)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status

View File

@ -187,8 +187,9 @@ class ProcessInstanceService:
@classmethod
def do_waiting(cls, status_value: str) -> None:
run_at_in_seconds_threshold = round(time.time())
min_age_in_seconds = 60 # to avoid conflicts with the interstitial page, we wait 60 seconds before processing
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value, run_at_in_seconds_threshold
status_value, run_at_in_seconds_threshold, min_age_in_seconds
)
if len(process_instance_ids_to_check) == 0:
return