Smaller locking window for the background processor (#183)

This commit is contained in:
jbirddog 2023-03-15 12:32:55 -04:00 committed by GitHub
parent 738a2e0078
commit 8e0324df63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 12 deletions

View File

@ -1,5 +1,6 @@
import time import time
from typing import List from typing import List
from typing import Optional
from flask import current_app from flask import current_app
@ -84,8 +85,33 @@ class ProcessInstanceQueueService:
ProcessInstanceLockService.lock(process_instance.id, queue_entry) ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod @classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many( def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value, status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]: ) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by() locked_by = ProcessInstanceLockService.locked_by()
@ -102,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit() db.session.commit()
queue_entries = ( queue_entries = cls.entries_with_status(status_value, locked_by)
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries) locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod @staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None: def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting.""" """Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many( process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value status_value
) )
if len(locked_process_instance_ids) == 0: if len(process_instance_ids_to_check) == 0:
return return
records = ( records = (
db.session.query(ProcessInstanceModel) db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore .filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all() .all()
) )
process_instance_lock_prefix = "Background" process_instance_lock_prefix = "Background"