mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-01-30 19:56:20 +00:00
dequeue in background processor instead of init to avoid lock issues when a read only object is necessary w/ burnettk jbirddog
This commit is contained in:
parent
3dba9690bf
commit
79e59220ff
@ -10,6 +10,10 @@ from spiffworkflow_backend.models.process_instance_queue import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ExpectedLockNotFoundError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ProcessInstanceLockService:
|
class ProcessInstanceLockService:
|
||||||
"""TODO: comment."""
|
"""TODO: comment."""
|
||||||
|
|
||||||
@ -49,8 +53,10 @@ class ProcessInstanceLockService:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def unlock(cls, process_instance_id: int) -> ProcessInstanceQueueModel:
|
def unlock(cls, process_instance_id: int) -> ProcessInstanceQueueModel:
|
||||||
ctx = cls.get_thread_local_locking_context()
|
queue_model = cls.try_unlock(process_instance_id)
|
||||||
return ctx["locks"].pop(process_instance_id) # type: ignore
|
if queue_model is None:
|
||||||
|
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}")
|
||||||
|
return queue_model
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def try_unlock(cls, process_instance_id: int) -> Optional[ProcessInstanceQueueModel]:
|
def try_unlock(cls, process_instance_id: int) -> Optional[ProcessInstanceQueueModel]:
|
||||||
|
@ -419,16 +419,15 @@ class ProcessInstanceProcessor:
|
|||||||
# * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow
|
# * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow
|
||||||
def __init__(self, process_instance_model: ProcessInstanceModel, validate_only: bool = False) -> None:
|
def __init__(self, process_instance_model: ProcessInstanceModel, validate_only: bool = False) -> None:
|
||||||
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
|
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
|
||||||
with ProcessInstanceQueueService.dequeued(process_instance_model):
|
try:
|
||||||
try:
|
self.setup_processor_with_process_instance(
|
||||||
self.setup_processor_with_process_instance(
|
process_instance_model=process_instance_model, validate_only=validate_only
|
||||||
process_instance_model=process_instance_model, validate_only=validate_only
|
)
|
||||||
)
|
except Exception as ex:
|
||||||
except Exception as ex:
|
process_instance_model.status = ProcessInstanceStatus.error.value
|
||||||
process_instance_model.status = ProcessInstanceStatus.error.value
|
db.session.add(process_instance_model)
|
||||||
db.session.add(process_instance_model)
|
db.session.commit()
|
||||||
db.session.commit()
|
raise ex
|
||||||
raise ex
|
|
||||||
|
|
||||||
def setup_processor_with_process_instance(
|
def setup_processor_with_process_instance(
|
||||||
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
|
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
|
||||||
|
@ -90,9 +90,11 @@ class ProcessInstanceQueueService:
|
|||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]:
|
def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]:
|
||||||
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
|
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
|
||||||
|
if not reentering_lock:
|
||||||
|
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
|
||||||
|
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
|
||||||
|
cls._dequeue(process_instance)
|
||||||
try:
|
try:
|
||||||
if not reentering_lock:
|
|
||||||
cls._dequeue(process_instance)
|
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
if not reentering_lock:
|
if not reentering_lock:
|
||||||
|
@ -113,10 +113,10 @@ class ProcessInstanceService:
|
|||||||
.all()
|
.all()
|
||||||
)
|
)
|
||||||
for process_instance in records:
|
for process_instance in records:
|
||||||
processor = None
|
|
||||||
try:
|
try:
|
||||||
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||||
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
if cls.can_optimistically_skip(processor, status_value):
|
if cls.can_optimistically_skip(processor, status_value):
|
||||||
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
||||||
continue
|
continue
|
||||||
|
Loading…
x
Reference in New Issue
Block a user