feature/detached-instance-error (#847)
* store the db ids of the process instance queue records instead of the sqlalchemy objects to avoid detached instance errors w/ burnettk * raise an error similar to one we raise elsewhere in the unexpected case that this fails * removed unused method --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com> Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
parent
0da550d412
commit
14846bfb86
|
@ -46,29 +46,17 @@ class ProcessInstanceLockService:
|
|||
cls, process_instance_id: int, queue_entry: ProcessInstanceQueueModel, additional_processing_identifier: str | None = None
|
||||
) -> None:
|
||||
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
|
||||
ctx["locks"][process_instance_id] = queue_entry
|
||||
ctx["locks"][process_instance_id] = queue_entry.id
|
||||
|
||||
@classmethod
|
||||
def lock_many(
|
||||
cls, queue_entries: list[ProcessInstanceQueueModel], additional_processing_identifier: str | None = None
|
||||
) -> list[int]:
|
||||
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
|
||||
new_locks = {entry.process_instance_id: entry for entry in queue_entries}
|
||||
new_lock_ids = list(new_locks.keys())
|
||||
ctx["locks"].update(new_locks)
|
||||
return new_lock_ids
|
||||
|
||||
@classmethod
|
||||
def unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> ProcessInstanceQueueModel:
|
||||
queue_model = cls.try_unlock(process_instance_id, additional_processing_identifier=additional_processing_identifier)
|
||||
if queue_model is None:
|
||||
def unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int:
|
||||
queue_model_id = cls.try_unlock(process_instance_id, additional_processing_identifier=additional_processing_identifier)
|
||||
if queue_model_id is None:
|
||||
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}")
|
||||
return queue_model
|
||||
return queue_model_id
|
||||
|
||||
@classmethod
|
||||
def try_unlock(
|
||||
cls, process_instance_id: int, additional_processing_identifier: str | None = None
|
||||
) -> ProcessInstanceQueueModel | None:
|
||||
def try_unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int | None:
|
||||
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
|
||||
return ctx["locks"].pop(process_instance_id, None) # type: ignore
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
|
||||
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
|
||||
from spiffworkflow_backend.services.process_instance_lock_service import ExpectedLockNotFoundError
|
||||
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
|
||||
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
|
||||
|
@ -40,9 +41,12 @@ class ProcessInstanceQueueService:
|
|||
|
||||
@classmethod
|
||||
def _enqueue(cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None) -> None:
|
||||
queue_entry = ProcessInstanceLockService.unlock(
|
||||
queue_entry_id = ProcessInstanceLockService.unlock(
|
||||
process_instance.id, additional_processing_identifier=additional_processing_identifier
|
||||
)
|
||||
queue_entry = ProcessInstanceQueueModel.query.filter_by(id=queue_entry_id).first()
|
||||
if queue_entry is None:
|
||||
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance.id}")
|
||||
current_time = round(time.time())
|
||||
if current_time > queue_entry.run_at_in_seconds:
|
||||
queue_entry.run_at_in_seconds = current_time
|
||||
|
|
Loading…
Reference in New Issue