Merge remote-tracking branch 'origin/main' into feature/add_task_definition_to_task

This commit is contained in:
jasquat 2023-03-15 16:11:57 -04:00
commit a0eb492cd5
6 changed files with 107 additions and 63 deletions

View File

@ -913,6 +913,12 @@ paths:
description: If set will return the tasks as they were during a specific step of execution. description: If set will return the tasks as they were during a specific step of execution.
schema: schema:
type: integer type: integer
- name: most_recent_tasks_only
in: query
required: false
description: If true, this wil return only the most recent tasks.
schema:
type: boolean
get: get:
tags: tags:
- Process Instances - Process Instances
@ -960,6 +966,12 @@ paths:
description: If set will return the tasks as they were during a specific step of execution. description: If set will return the tasks as they were during a specific step of execution.
schema: schema:
type: integer type: integer
- name: most_recent_tasks_only
in: query
required: false
description: If true, this wil return only the most recent tasks.
schema:
type: boolean
get: get:
tags: tags:
- Process Instances - Process Instances

View File

@ -4,6 +4,7 @@ import json
from typing import Any from typing import Any
from typing import Dict from typing import Dict
from typing import Optional from typing import Optional
from uuid import UUID
import flask.wrappers import flask.wrappers
from flask import current_app from flask import current_app
@ -55,9 +56,18 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS
from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitCommandError
from spiffworkflow_backend.services.git_service import GitService from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsAlreadyLockedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsNotEnqueuedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import ( from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceQueueService, ProcessInstanceQueueService,
) )
@ -129,7 +139,11 @@ def process_instance_run(
try: try:
processor.lock_process_instance("Web") processor.lock_process_instance("Web")
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
except ApiError as e: except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e) ErrorHandlingService().handle_error(processor, e)
raise e raise e
except Exception as e: except Exception as e:
@ -143,6 +157,7 @@ def process_instance_run(
task=task, task=task,
) from e ) from e
finally: finally:
if ProcessInstanceLockService.has_lock(process_instance.id):
processor.unlock_process_instance("Web") processor.unlock_process_instance("Web")
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
@ -534,6 +549,7 @@ def process_instance_task_list_without_task_data_for_me(
process_instance_id: int, process_instance_id: int,
all_tasks: bool = False, all_tasks: bool = False,
spiff_step: int = 0, spiff_step: int = 0,
most_recent_tasks_only: bool = False,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_instance_task_list_without_task_data_for_me.""" """Process_instance_task_list_without_task_data_for_me."""
process_instance = _find_process_instance_for_me_or_raise(process_instance_id) process_instance = _find_process_instance_for_me_or_raise(process_instance_id)
@ -542,6 +558,7 @@ def process_instance_task_list_without_task_data_for_me(
process_instance, process_instance,
all_tasks, all_tasks,
spiff_step, spiff_step,
most_recent_tasks_only,
) )
@ -550,6 +567,7 @@ def process_instance_task_list_without_task_data(
process_instance_id: int, process_instance_id: int,
all_tasks: bool = False, all_tasks: bool = False,
spiff_step: int = 0, spiff_step: int = 0,
most_recent_tasks_only: bool = False,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_instance_task_list_without_task_data.""" """Process_instance_task_list_without_task_data."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -558,6 +576,7 @@ def process_instance_task_list_without_task_data(
process_instance, process_instance,
all_tasks, all_tasks,
spiff_step, spiff_step,
most_recent_tasks_only,
) )
@ -582,50 +601,33 @@ def process_instance_task_list(
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
full_bpmn_process_dict = processor.full_bpmn_process_dict full_bpmn_process_dict = processor.full_bpmn_process_dict
tasks = full_bpmn_process_dict["tasks"] tasks = full_bpmn_process_dict["tasks"]
subprocesses = full_bpmn_process_dict["subprocesses"] subprocesses = full_bpmn_process_dict["subprocesses"]
steps_by_id = {step_detail.task_id: step_detail for step_detail in step_details} steps_by_id = {step_detail.task_id: step_detail for step_detail in step_details}
subprocess_state_overrides = {} def restore_task(spiff_task: dict[str, Any], step_ended: float) -> None:
for step_detail in step_details: if spiff_task["last_state_change"] > step_ended:
if step_detail.task_id in tasks: spiff_task["state"] = Task.task_state_name_to_int("FUTURE")
tasks[step_detail.task_id]["state"] = Task.task_state_name_to_int( spiff_task["data"] = {}
step_detail.task_state
)
else:
for subprocess_id, subprocess_info in subprocesses.items():
if step_detail.task_id in subprocess_info["tasks"]:
subprocess_info["tasks"][step_detail.task_id]["state"] = (
Task.task_state_name_to_int(step_detail.task_state)
)
subprocess_state_overrides[subprocess_id] = TaskState.WAITING
for subprocess_info in subprocesses.values(): if spiff_step > 0:
for spiff_task_id in subprocess_info["tasks"]: last_change = step_details[-1].end_in_seconds or 0
if spiff_task_id not in steps_by_id: for spiff_task in tasks.values():
subprocess_info["tasks"][spiff_task_id]["data"] = {} restore_task(spiff_task, last_change)
subprocess_info["tasks"][spiff_task_id]["state"] = ( for subprocess in subprocesses.values():
subprocess_state_overrides.get(spiff_task_id, TaskState.FUTURE) for spiff_task in subprocess["tasks"].values():
) restore_task(spiff_task, last_change)
for spiff_task_id in tasks:
if spiff_task_id not in steps_by_id:
tasks[spiff_task_id]["data"] = {}
tasks[spiff_task_id]["state"] = subprocess_state_overrides.get(
spiff_task_id, TaskState.FUTURE
)
bpmn_process_instance = ProcessInstanceProcessor._serializer.workflow_from_dict( bpmn_process_instance = ProcessInstanceProcessor._serializer.workflow_from_dict(
full_bpmn_process_dict full_bpmn_process_dict
) )
if spiff_step > 0:
spiff_task = processor.__class__.get_task_by_bpmn_identifier( bpmn_process_instance.complete_task_from_id(UUID(step_details[-1].task_id))
step_details[-1].bpmn_task_identifier, bpmn_process_instance for subprocess_id, subprocess in bpmn_process_instance.subprocesses.items():
) if not subprocess.is_completed():
if spiff_task is not None and spiff_task.state != TaskState.READY: task = bpmn_process_instance.get_task(subprocess_id)
spiff_task.complete() task._set_state(TaskState.WAITING)
spiff_tasks = None spiff_tasks = None
if all_tasks: if all_tasks:
@ -641,21 +643,24 @@ def process_instance_task_list(
subprocesses_by_child_task_ids, task_typename_by_task_id subprocesses_by_child_task_ids, task_typename_by_task_id
) )
tasks = []
spiff_tasks_to_process = spiff_tasks spiff_tasks_to_process = spiff_tasks
if most_recent_tasks_only: if most_recent_tasks_only:
spiff_tasks_by_process_id_and_task_name: dict[str, SpiffTask] = {} spiff_tasks_by_process_id_and_task_name: dict[str, SpiffTask] = {}
for spiff_task in spiff_tasks: current_tasks = {}
for spiff_task in spiff_tasks_to_process:
row_id = f"{spiff_task.task_spec._wf_spec.name}:{spiff_task.task_spec.name}" row_id = f"{spiff_task.task_spec._wf_spec.name}:{spiff_task.task_spec.name}"
if spiff_task.state in [TaskState.READY, TaskState.WAITING]:
current_tasks[row_id] = spiff_task
if ( if (
row_id not in spiff_tasks_by_process_id_and_task_name row_id not in spiff_tasks_by_process_id_and_task_name
or spiff_task.last_state_change or spiff_task.state
> spiff_tasks_by_process_id_and_task_name[row_id].last_state_change > spiff_tasks_by_process_id_and_task_name[row_id].state
): ):
spiff_tasks_by_process_id_and_task_name[row_id] = spiff_task spiff_tasks_by_process_id_and_task_name[row_id] = spiff_task
spiff_tasks_by_process_id_and_task_name.update(current_tasks)
spiff_tasks_to_process = spiff_tasks_by_process_id_and_task_name.values() spiff_tasks_to_process = spiff_tasks_by_process_id_and_task_name.values()
response = []
for spiff_task in spiff_tasks_to_process: for spiff_task in spiff_tasks_to_process:
task_spiff_step: Optional[int] = None task_spiff_step: Optional[int] = None
if str(spiff_task.id) in steps_by_id: if str(spiff_task.id) in steps_by_id:
@ -669,9 +674,11 @@ def process_instance_task_list(
calling_subprocess_task_id=calling_subprocess_task_id, calling_subprocess_task_id=calling_subprocess_task_id,
task_spiff_step=task_spiff_step, task_spiff_step=task_spiff_step,
) )
tasks.append(task) if task.state in ["MAYBE", "LIKELY"]:
task.state = "FUTURE"
response.append(task)
return make_response(jsonify(tasks), 200) return make_response(jsonify(response), 200)
def process_instance_reset( def process_instance_reset(

View File

@ -1,5 +1,6 @@
import time import time
from typing import List from typing import List
from typing import Optional
from flask import current_app from flask import current_app
@ -14,6 +15,10 @@ from spiffworkflow_backend.services.process_instance_lock_service import (
) )
class ProcessInstanceIsNotEnqueuedError(Exception):
pass
class ProcessInstanceIsAlreadyLockedError(Exception): class ProcessInstanceIsAlreadyLockedError(Exception):
pass pass
@ -62,21 +67,51 @@ class ProcessInstanceQueueService:
db.session.query(ProcessInstanceQueueModel) db.session.query(ProcessInstanceQueueModel)
.filter( .filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id, ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by == locked_by,
) )
.first() .first()
) )
if queue_entry is None: if queue_entry is None:
raise ProcessInstanceIsNotEnqueuedError(
f"{locked_by} cannot lock process instance {process_instance.id}. It"
" has not been enqueued."
)
if queue_entry.locked_by != locked_by:
raise ProcessInstanceIsAlreadyLockedError( raise ProcessInstanceIsAlreadyLockedError(
f"Cannot lock process instance {process_instance.id}. " f"{locked_by} cannot lock process instance {process_instance.id}. "
"It has already been locked or has not been enqueued." f"It has already been locked by {queue_entry.locked_by}."
) )
ProcessInstanceLockService.lock(process_instance.id, queue_entry) ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod @classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many( def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value, status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]: ) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by() locked_by = ProcessInstanceLockService.locked_by()
@ -93,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit() db.session.commit()
queue_entries = ( queue_entries = cls.entries_with_status(status_value, locked_by)
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries) locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod @staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None: def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting.""" """Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many( process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value status_value
) )
if len(locked_process_instance_ids) == 0: if len(process_instance_ids_to_check) == 0:
return return
records = ( records = (
db.session.query(ProcessInstanceModel) db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore .filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all() .all()
) )
process_instance_lock_prefix = "Background" process_instance_lock_prefix = "Background"

View File

@ -704,11 +704,8 @@ export default function ProcessInstanceListTable({
setEndToTime(''); setEndToTime('');
setProcessInitiatorSelection(null); setProcessInitiatorSelection(null);
setProcessInitiatorText(''); setProcessInitiatorText('');
if (reportMetadata) { if (reportMetadata) {
reportMetadata.columns = reportMetadata.columns.filter( reportMetadata.filter_by = [];
(column) => !column.filterable
);
} }
}; };

View File

@ -144,7 +144,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
path: `${apiPath}/${modifiedProcessModelId}/${params.process_instance_id}${queryParams}`, path: `${apiPath}/${modifiedProcessModelId}/${params.process_instance_id}${queryParams}`,
successCallback: setProcessInstance, successCallback: setProcessInstance,
}); });
let taskParams = '?all_tasks=true'; let taskParams = '?all_tasks=true&most_recent_tasks_only=true';
if (typeof params.spiff_step !== 'undefined') { if (typeof params.spiff_step !== 'undefined') {
taskParams = `${taskParams}&spiff_step=${params.spiff_step}`; taskParams = `${taskParams}&spiff_step=${params.spiff_step}`;
} }