make sure we have a current processor so we don't return null (#379)

* make sure we have a current processor so we don't return null

* remove sleep

* The background processor now only picks up processes that were last updated more than a minute ago to avoid conflicting with the interstitial page.  With the understanding that we can rmeove this limitation when we can refactor to allow the backend processes to provide updates on what they are doing.

* pyl w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
Dan Funk 2023-07-07 15:49:52 -04:00 committed by GitHub
parent 92bd5d8883
commit 2b6d1c334f
3 changed files with 8 additions and 5 deletions

View File

@ -441,14 +441,12 @@ def _interstitial_stream(
# our session has stale results without the rollback. # our session has stale results without the rollback.
db.session.rollback() db.session.rollback()
db.session.refresh(process_instance) db.session.refresh(process_instance)
processor = ProcessInstanceProcessor(process_instance)
# if process instance is done or blocked by a human task, then break out # if process instance is done or blocked by a human task, then break out
if is_locked and process_instance.status not in ["not_started", "waiting"]: if is_locked and process_instance.status not in ["not_started", "waiting"]:
break break
# only get a new processor if we are not executing tasks otherwise we are the ones updating it
processor = ProcessInstanceProcessor(process_instance)
tasks = get_reportable_tasks() tasks = get_reportable_tasks()
spiff_task = processor.next_task() spiff_task = processor.next_task()

View File

@ -115,11 +115,14 @@ class ProcessInstanceQueueService:
status_value: str, status_value: str,
locked_by: str | None, locked_by: str | None,
run_at_in_seconds_threshold: int, run_at_in_seconds_threshold: int,
min_age_in_seconds: int = 0,
) -> list[ProcessInstanceQueueModel]: ) -> list[ProcessInstanceQueueModel]:
return ( return (
db.session.query(ProcessInstanceQueueModel) db.session.query(ProcessInstanceQueueModel)
.filter( .filter(
ProcessInstanceQueueModel.status == status_value, ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.updated_at_in_seconds <= round(time.time()) - min_age_in_seconds,
# At least a minute old.
ProcessInstanceQueueModel.locked_by == locked_by, ProcessInstanceQueueModel.locked_by == locked_by,
ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold, ProcessInstanceQueueModel.run_at_in_seconds <= run_at_in_seconds_threshold,
) )
@ -131,8 +134,9 @@ class ProcessInstanceQueueService:
cls, cls,
status_value: str, status_value: str,
run_at_in_seconds_threshold: int, run_at_in_seconds_threshold: int,
min_age_in_seconds: int = 0,
) -> list[int]: ) -> list[int]:
queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold) queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold, min_age_in_seconds)
ids_with_status = [entry.process_instance_id for entry in queue_entries] ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status return ids_with_status

View File

@ -187,8 +187,9 @@ class ProcessInstanceService:
@classmethod @classmethod
def do_waiting(cls, status_value: str) -> None: def do_waiting(cls, status_value: str) -> None:
run_at_in_seconds_threshold = round(time.time()) run_at_in_seconds_threshold = round(time.time())
min_age_in_seconds = 60 # to avoid conflicts with the interstitial page, we wait 60 seconds before processing
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many( process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value, run_at_in_seconds_threshold status_value, run_at_in_seconds_threshold, min_age_in_seconds
) )
if len(process_instance_ids_to_check) == 0: if len(process_instance_ids_to_check) == 0:
return return