dequeue in background processor instead of init to avoid lock issues when a read only object is necessary w/ burnettk jbirddog

This commit is contained in:
jasquat 2023-04-11 14:40:38 -04:00
parent 33964a08bb
commit a9fc10aa55
No known key found for this signature in database
4 changed files with 23 additions and 16 deletions

View File

@ -10,6 +10,10 @@ from spiffworkflow_backend.models.process_instance_queue import (
)
class ExpectedLockNotFoundError(Exception):
pass
class ProcessInstanceLockService:
"""TODO: comment."""
@ -49,8 +53,10 @@ class ProcessInstanceLockService:
@classmethod
def unlock(cls, process_instance_id: int) -> ProcessInstanceQueueModel:
ctx = cls.get_thread_local_locking_context()
return ctx["locks"].pop(process_instance_id) # type: ignore
queue_model = cls.try_unlock(process_instance_id)
if queue_model is None:
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}")
return queue_model
@classmethod
def try_unlock(cls, process_instance_id: int) -> Optional[ProcessInstanceQueueModel]:

View File

@ -419,16 +419,15 @@ class ProcessInstanceProcessor:
# * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow
def __init__(self, process_instance_model: ProcessInstanceModel, validate_only: bool = False) -> None:
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
with ProcessInstanceQueueService.dequeued(process_instance_model):
try:
self.setup_processor_with_process_instance(
process_instance_model=process_instance_model, validate_only=validate_only
)
except Exception as ex:
process_instance_model.status = ProcessInstanceStatus.error.value
db.session.add(process_instance_model)
db.session.commit()
raise ex
try:
self.setup_processor_with_process_instance(
process_instance_model=process_instance_model, validate_only=validate_only
)
except Exception as ex:
process_instance_model.status = ProcessInstanceStatus.error.value
db.session.add(process_instance_model)
db.session.commit()
raise ex
def setup_processor_with_process_instance(
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False

View File

@ -90,9 +90,11 @@ class ProcessInstanceQueueService:
@contextlib.contextmanager
def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]:
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
if not reentering_lock:
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
cls._dequeue(process_instance)
try:
if not reentering_lock:
cls._dequeue(process_instance)
yield
finally:
if not reentering_lock:

View File

@ -113,10 +113,10 @@ class ProcessInstanceService:
.all()
)
for process_instance in records:
processor = None
try:
current_app.logger.info(f"Processing process_instance {process_instance.id}")
processor = ProcessInstanceProcessor(process_instance)
with ProcessInstanceQueueService.dequeued(process_instance):
processor = ProcessInstanceProcessor(process_instance)
if cls.can_optimistically_skip(processor, status_value):
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
continue