Merge remote-tracking branch 'origin/main' into feature/remove-loop-reset

This commit is contained in:
burnettk 2023-03-15 14:14:50 -04:00
commit eb763342ce
4 changed files with 60 additions and 21 deletions

View File

@ -56,9 +56,18 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS
from spiffworkflow_backend.services.git_service import GitCommandError
from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsAlreadyLockedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsNotEnqueuedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceQueueService,
)
@ -130,7 +139,11 @@ def process_instance_run(
try:
processor.lock_process_instance("Web")
processor.do_engine_steps(save=True)
except ApiError as e:
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
raise e
except Exception as e:
@ -144,7 +157,8 @@ def process_instance_run(
task=task,
) from e
finally:
processor.unlock_process_instance("Web")
if ProcessInstanceLockService.has_lock(process_instance.id):
processor.unlock_process_instance("Web")
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances()

View File

@ -1,5 +1,6 @@
import time
from typing import List
from typing import Optional
from flask import current_app
@ -14,6 +15,10 @@ from spiffworkflow_backend.services.process_instance_lock_service import (
)
class ProcessInstanceIsNotEnqueuedError(Exception):
pass
class ProcessInstanceIsAlreadyLockedError(Exception):
pass
@ -62,21 +67,51 @@ class ProcessInstanceQueueService:
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.first()
)
if queue_entry is None:
raise ProcessInstanceIsNotEnqueuedError(
f"{locked_by} cannot lock process instance {process_instance.id}. It"
" has not been enqueued."
)
if queue_entry.locked_by != locked_by:
raise ProcessInstanceIsAlreadyLockedError(
f"Cannot lock process instance {process_instance.id}. "
"It has already been locked or has not been enqueued."
f"{locked_by} cannot lock process instance {process_instance.id}. "
f"It has already been locked by {queue_entry.locked_by}."
)
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod
@classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by()
@ -93,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit()
queue_entries = (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
queue_entries = cls.entries_with_status(status_value, locked_by)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many(
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value
)
if len(locked_process_instance_ids) == 0:
if len(process_instance_ids_to_check) == 0:
return
records = (
db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore
.filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all()
)
process_instance_lock_prefix = "Background"

View File

@ -704,11 +704,8 @@ export default function ProcessInstanceListTable({
setEndToTime('');
setProcessInitiatorSelection(null);
setProcessInitiatorText('');
if (reportMetadata) {
reportMetadata.columns = reportMetadata.columns.filter(
(column) => !column.filterable
);
reportMetadata.filter_by = [];
}
};