Merge remote-tracking branch 'origin/main' into feature/remove-loop-reset

This commit is contained in:
burnettk 2023-03-15 14:14:50 -04:00
commit b9153f77e7
No known key found for this signature in database
4 changed files with 60 additions and 21 deletions

View File

@ -56,9 +56,18 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS
from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitCommandError
from spiffworkflow_backend.services.git_service import GitService from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsAlreadyLockedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsNotEnqueuedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import ( from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceQueueService, ProcessInstanceQueueService,
) )
@ -130,7 +139,11 @@ def process_instance_run(
try: try:
processor.lock_process_instance("Web") processor.lock_process_instance("Web")
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
except ApiError as e: except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e) ErrorHandlingService().handle_error(processor, e)
raise e raise e
except Exception as e: except Exception as e:
@ -144,7 +157,8 @@ def process_instance_run(
task=task, task=task,
) from e ) from e
finally: finally:
processor.unlock_process_instance("Web") if ProcessInstanceLockService.has_lock(process_instance.id):
processor.unlock_process_instance("Web")
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances() MessageService.correlate_all_message_instances()

View File

@ -1,5 +1,6 @@
import time import time
from typing import List from typing import List
from typing import Optional
from flask import current_app from flask import current_app
@ -14,6 +15,10 @@ from spiffworkflow_backend.services.process_instance_lock_service import (
) )
class ProcessInstanceIsNotEnqueuedError(Exception):
pass
class ProcessInstanceIsAlreadyLockedError(Exception): class ProcessInstanceIsAlreadyLockedError(Exception):
pass pass
@ -62,21 +67,51 @@ class ProcessInstanceQueueService:
db.session.query(ProcessInstanceQueueModel) db.session.query(ProcessInstanceQueueModel)
.filter( .filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id, ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by == locked_by,
) )
.first() .first()
) )
if queue_entry is None: if queue_entry is None:
raise ProcessInstanceIsNotEnqueuedError(
f"{locked_by} cannot lock process instance {process_instance.id}. It"
" has not been enqueued."
)
if queue_entry.locked_by != locked_by:
raise ProcessInstanceIsAlreadyLockedError( raise ProcessInstanceIsAlreadyLockedError(
f"Cannot lock process instance {process_instance.id}. " f"{locked_by} cannot lock process instance {process_instance.id}. "
"It has already been locked or has not been enqueued." f"It has already been locked by {queue_entry.locked_by}."
) )
ProcessInstanceLockService.lock(process_instance.id, queue_entry) ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod @classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many( def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value, status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]: ) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by() locked_by = ProcessInstanceLockService.locked_by()
@ -93,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit() db.session.commit()
queue_entries = ( queue_entries = cls.entries_with_status(status_value, locked_by)
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries) locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod @staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None: def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting.""" """Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many( process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value status_value
) )
if len(locked_process_instance_ids) == 0: if len(process_instance_ids_to_check) == 0:
return return
records = ( records = (
db.session.query(ProcessInstanceModel) db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore .filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all() .all()
) )
process_instance_lock_prefix = "Background" process_instance_lock_prefix = "Background"

View File

@ -704,11 +704,8 @@ export default function ProcessInstanceListTable({
setEndToTime(''); setEndToTime('');
setProcessInitiatorSelection(null); setProcessInitiatorSelection(null);
setProcessInitiatorText(''); setProcessInitiatorText('');
if (reportMetadata) { if (reportMetadata) {
reportMetadata.columns = reportMetadata.columns.filter( reportMetadata.filter_by = [];
(column) => !column.filterable
);
} }
}; };