Smaller locking window for the background processor (#183)

This commit is contained in:
jbirddog 2023-03-15 12:32:55 -04:00 committed by GitHub
parent efcc083638
commit 12f0dc5315
2 changed files with 31 additions and 12 deletions

View File

@ -1,5 +1,6 @@
import time
from typing import List
from typing import Optional
from flask import current_app
@ -84,8 +85,33 @@ class ProcessInstanceQueueService:
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod
@classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by()
@ -102,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit()
queue_entries = (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
queue_entries = cls.entries_with_status(status_value, locked_by)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many(
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value
)
if len(locked_process_instance_ids) == 0:
if len(process_instance_ids_to_check) == 0:
return
records = (
db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore
.filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all()
)
process_instance_lock_prefix = "Background"