Merge remote-tracking branch 'origin/main' into feature/add_task_definition_to_task

This commit is contained in:
jasquat 2023-03-15 16:11:57 -04:00
commit 9c4fb0446c
No known key found for this signature in database
6 changed files with 107 additions and 63 deletions

View File

@ -913,6 +913,12 @@ paths:
description: If set will return the tasks as they were during a specific step of execution.
schema:
type: integer
- name: most_recent_tasks_only
in: query
required: false
description: If true, this wil return only the most recent tasks.
schema:
type: boolean
get:
tags:
- Process Instances
@ -960,6 +966,12 @@ paths:
description: If set will return the tasks as they were during a specific step of execution.
schema:
type: integer
- name: most_recent_tasks_only
in: query
required: false
description: If true, this wil return only the most recent tasks.
schema:
type: boolean
get:
tags:
- Process Instances

View File

@ -4,6 +4,7 @@ import json
from typing import Any
from typing import Dict
from typing import Optional
from uuid import UUID
import flask.wrappers
from flask import current_app
@ -55,9 +56,18 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS
from spiffworkflow_backend.services.git_service import GitCommandError
from spiffworkflow_backend.services.git_service import GitService
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_lock_service import (
ProcessInstanceLockService,
)
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsAlreadyLockedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceIsNotEnqueuedError,
)
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceQueueService,
)
@ -129,7 +139,11 @@ def process_instance_run(
try:
processor.lock_process_instance("Web")
processor.do_engine_steps(save=True)
except ApiError as e:
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
ErrorHandlingService().handle_error(processor, e)
raise e
except Exception as e:
@ -143,7 +157,8 @@ def process_instance_run(
task=task,
) from e
finally:
processor.unlock_process_instance("Web")
if ProcessInstanceLockService.has_lock(process_instance.id):
processor.unlock_process_instance("Web")
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances()
@ -534,6 +549,7 @@ def process_instance_task_list_without_task_data_for_me(
process_instance_id: int,
all_tasks: bool = False,
spiff_step: int = 0,
most_recent_tasks_only: bool = False,
) -> flask.wrappers.Response:
"""Process_instance_task_list_without_task_data_for_me."""
process_instance = _find_process_instance_for_me_or_raise(process_instance_id)
@ -542,6 +558,7 @@ def process_instance_task_list_without_task_data_for_me(
process_instance,
all_tasks,
spiff_step,
most_recent_tasks_only,
)
@ -550,6 +567,7 @@ def process_instance_task_list_without_task_data(
process_instance_id: int,
all_tasks: bool = False,
spiff_step: int = 0,
most_recent_tasks_only: bool = False,
) -> flask.wrappers.Response:
"""Process_instance_task_list_without_task_data."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -558,6 +576,7 @@ def process_instance_task_list_without_task_data(
process_instance,
all_tasks,
spiff_step,
most_recent_tasks_only,
)
@ -582,50 +601,33 @@ def process_instance_task_list(
processor = ProcessInstanceProcessor(process_instance)
full_bpmn_process_dict = processor.full_bpmn_process_dict
tasks = full_bpmn_process_dict["tasks"]
subprocesses = full_bpmn_process_dict["subprocesses"]
steps_by_id = {step_detail.task_id: step_detail for step_detail in step_details}
subprocess_state_overrides = {}
for step_detail in step_details:
if step_detail.task_id in tasks:
tasks[step_detail.task_id]["state"] = Task.task_state_name_to_int(
step_detail.task_state
)
else:
for subprocess_id, subprocess_info in subprocesses.items():
if step_detail.task_id in subprocess_info["tasks"]:
subprocess_info["tasks"][step_detail.task_id]["state"] = (
Task.task_state_name_to_int(step_detail.task_state)
)
subprocess_state_overrides[subprocess_id] = TaskState.WAITING
def restore_task(spiff_task: dict[str, Any], step_ended: float) -> None:
if spiff_task["last_state_change"] > step_ended:
spiff_task["state"] = Task.task_state_name_to_int("FUTURE")
spiff_task["data"] = {}
for subprocess_info in subprocesses.values():
for spiff_task_id in subprocess_info["tasks"]:
if spiff_task_id not in steps_by_id:
subprocess_info["tasks"][spiff_task_id]["data"] = {}
subprocess_info["tasks"][spiff_task_id]["state"] = (
subprocess_state_overrides.get(spiff_task_id, TaskState.FUTURE)
)
for spiff_task_id in tasks:
if spiff_task_id not in steps_by_id:
tasks[spiff_task_id]["data"] = {}
tasks[spiff_task_id]["state"] = subprocess_state_overrides.get(
spiff_task_id, TaskState.FUTURE
)
if spiff_step > 0:
last_change = step_details[-1].end_in_seconds or 0
for spiff_task in tasks.values():
restore_task(spiff_task, last_change)
for subprocess in subprocesses.values():
for spiff_task in subprocess["tasks"].values():
restore_task(spiff_task, last_change)
bpmn_process_instance = ProcessInstanceProcessor._serializer.workflow_from_dict(
full_bpmn_process_dict
)
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
step_details[-1].bpmn_task_identifier, bpmn_process_instance
)
if spiff_task is not None and spiff_task.state != TaskState.READY:
spiff_task.complete()
if spiff_step > 0:
bpmn_process_instance.complete_task_from_id(UUID(step_details[-1].task_id))
for subprocess_id, subprocess in bpmn_process_instance.subprocesses.items():
if not subprocess.is_completed():
task = bpmn_process_instance.get_task(subprocess_id)
task._set_state(TaskState.WAITING)
spiff_tasks = None
if all_tasks:
@ -641,21 +643,24 @@ def process_instance_task_list(
subprocesses_by_child_task_ids, task_typename_by_task_id
)
tasks = []
spiff_tasks_to_process = spiff_tasks
if most_recent_tasks_only:
spiff_tasks_by_process_id_and_task_name: dict[str, SpiffTask] = {}
for spiff_task in spiff_tasks:
current_tasks = {}
for spiff_task in spiff_tasks_to_process:
row_id = f"{spiff_task.task_spec._wf_spec.name}:{spiff_task.task_spec.name}"
if spiff_task.state in [TaskState.READY, TaskState.WAITING]:
current_tasks[row_id] = spiff_task
if (
row_id not in spiff_tasks_by_process_id_and_task_name
or spiff_task.last_state_change
> spiff_tasks_by_process_id_and_task_name[row_id].last_state_change
or spiff_task.state
> spiff_tasks_by_process_id_and_task_name[row_id].state
):
spiff_tasks_by_process_id_and_task_name[row_id] = spiff_task
spiff_tasks_by_process_id_and_task_name.update(current_tasks)
spiff_tasks_to_process = spiff_tasks_by_process_id_and_task_name.values()
response = []
for spiff_task in spiff_tasks_to_process:
task_spiff_step: Optional[int] = None
if str(spiff_task.id) in steps_by_id:
@ -669,9 +674,11 @@ def process_instance_task_list(
calling_subprocess_task_id=calling_subprocess_task_id,
task_spiff_step=task_spiff_step,
)
tasks.append(task)
if task.state in ["MAYBE", "LIKELY"]:
task.state = "FUTURE"
response.append(task)
return make_response(jsonify(tasks), 200)
return make_response(jsonify(response), 200)
def process_instance_reset(

View File

@ -1,5 +1,6 @@
import time
from typing import List
from typing import Optional
from flask import current_app
@ -14,6 +15,10 @@ from spiffworkflow_backend.services.process_instance_lock_service import (
)
class ProcessInstanceIsNotEnqueuedError(Exception):
pass
class ProcessInstanceIsAlreadyLockedError(Exception):
pass
@ -62,21 +67,51 @@ class ProcessInstanceQueueService:
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.first()
)
if queue_entry is None:
raise ProcessInstanceIsNotEnqueuedError(
f"{locked_by} cannot lock process instance {process_instance.id}. It"
" has not been enqueued."
)
if queue_entry.locked_by != locked_by:
raise ProcessInstanceIsAlreadyLockedError(
f"Cannot lock process instance {process_instance.id}. "
"It has already been locked or has not been enqueued."
f"{locked_by} cannot lock process instance {process_instance.id}. "
f"It has already been locked by {queue_entry.locked_by}."
)
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@staticmethod
@classmethod
def entries_with_status(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
locked_by: Optional[str] = None,
) -> List[ProcessInstanceQueueModel]:
return (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
@classmethod
def peek_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
queue_entries = cls.entries_with_status(status_value, None)
ids_with_status = [entry.process_instance_id for entry in queue_entries]
return ids_with_status
@classmethod
def dequeue_many(
cls,
status_value: str = ProcessInstanceStatus.waiting.value,
) -> List[int]:
locked_by = ProcessInstanceLockService.locked_by()
@ -93,14 +128,7 @@ class ProcessInstanceQueueService:
db.session.commit()
queue_entries = (
db.session.query(ProcessInstanceQueueModel)
.filter(
ProcessInstanceQueueModel.status == status_value,
ProcessInstanceQueueModel.locked_by == locked_by,
)
.all()
)
queue_entries = cls.entries_with_status(status_value, locked_by)
locked_ids = ProcessInstanceLockService.lock_many(queue_entries)

View File

@ -84,15 +84,15 @@ class ProcessInstanceService:
@staticmethod
def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None:
"""Do_waiting."""
locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many(
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
status_value
)
if len(locked_process_instance_ids) == 0:
if len(process_instance_ids_to_check) == 0:
return
records = (
db.session.query(ProcessInstanceModel)
.filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore
.filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore
.all()
)
process_instance_lock_prefix = "Background"

View File

@ -704,11 +704,8 @@ export default function ProcessInstanceListTable({
setEndToTime('');
setProcessInitiatorSelection(null);
setProcessInitiatorText('');
if (reportMetadata) {
reportMetadata.columns = reportMetadata.columns.filter(
(column) => !column.filterable
);
reportMetadata.filter_by = [];
}
};

View File

@ -144,7 +144,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
path: `${apiPath}/${modifiedProcessModelId}/${params.process_instance_id}${queryParams}`,
successCallback: setProcessInstance,
});
let taskParams = '?all_tasks=true';
let taskParams = '?all_tasks=true&most_recent_tasks_only=true';
if (typeof params.spiff_step !== 'undefined') {
taskParams = `${taskParams}&spiff_step=${params.spiff_step}`;
}