Feature/error to suspended state (#384)
* fixed suspended message w/ burnettk * allow manually executing errored tasks when suspended w/ burnettk --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
1134e39dd1
commit
285294967d
|
@ -78,7 +78,9 @@ from spiffworkflow_backend.services.service_task_service import ServiceTaskDeleg
|
|||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
from spiffworkflow_backend.services.task_service import TaskService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import ExecutionStrategy
|
||||
from spiffworkflow_backend.services.workflow_execution_service import ExecutionStrategyNotConfiguredError
|
||||
from spiffworkflow_backend.services.workflow_execution_service import SkipOneExecutionStrategy
|
||||
from spiffworkflow_backend.services.workflow_execution_service import TaskModelSavingDelegate
|
||||
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import execution_strategy_named
|
||||
|
@ -1093,6 +1095,9 @@ class ProcessInstanceProcessor:
|
|||
"""Mark the task complete optionally executing it."""
|
||||
spiff_task = self.bpmn_process_instance.get_task_from_id(UUID(task_id))
|
||||
event_type = ProcessInstanceEventType.task_skipped.value
|
||||
if execute:
|
||||
event_type = ProcessInstanceEventType.task_executed_manually.value
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# manual actually means any human task
|
||||
|
@ -1112,7 +1117,15 @@ class ProcessInstanceProcessor:
|
|||
self.do_engine_steps(save=True, execution_strategy_name="one_at_a_time")
|
||||
else:
|
||||
current_app.logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info())
|
||||
self.do_engine_steps(save=True, execution_strategy_name="skip_one")
|
||||
task_model_delegate = TaskModelSavingDelegate(
|
||||
serializer=self._serializer,
|
||||
process_instance=self.process_instance_model,
|
||||
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
|
||||
)
|
||||
execution_strategy = SkipOneExecutionStrategy(
|
||||
task_model_delegate, self.lazy_load_subprocess_specs, {"spiff_task": spiff_task}
|
||||
)
|
||||
self.do_engine_steps(save=True, execution_strategy=execution_strategy)
|
||||
|
||||
spiff_tasks = self.bpmn_process_instance.get_tasks()
|
||||
task_service = TaskService(
|
||||
|
@ -1359,18 +1372,20 @@ class ProcessInstanceProcessor:
|
|||
exit_at: None = None,
|
||||
save: bool = False,
|
||||
execution_strategy_name: str | None = None,
|
||||
execution_strategy: ExecutionStrategy | None = None,
|
||||
) -> None:
|
||||
with ProcessInstanceQueueService.dequeued(self.process_instance_model):
|
||||
# TODO: ideally we just lock in the execution service, but not sure
|
||||
# about _add_bpmn_process_definitions and if that needs to happen in
|
||||
# the same lock like it does on main
|
||||
self._do_engine_steps(exit_at, save, execution_strategy_name)
|
||||
self._do_engine_steps(exit_at, save, execution_strategy_name, execution_strategy)
|
||||
|
||||
def _do_engine_steps(
|
||||
self,
|
||||
exit_at: None = None,
|
||||
save: bool = False,
|
||||
execution_strategy_name: str | None = None,
|
||||
execution_strategy: ExecutionStrategy | None = None,
|
||||
) -> None:
|
||||
self._add_bpmn_process_definitions()
|
||||
|
||||
|
@ -1380,16 +1395,17 @@ class ProcessInstanceProcessor:
|
|||
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
|
||||
)
|
||||
|
||||
if execution_strategy_name is None:
|
||||
execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"]
|
||||
if execution_strategy_name is None:
|
||||
raise ExecutionStrategyNotConfiguredError(
|
||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set"
|
||||
if execution_strategy is None:
|
||||
if execution_strategy_name is None:
|
||||
execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"]
|
||||
if execution_strategy_name is None:
|
||||
raise ExecutionStrategyNotConfiguredError(
|
||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set"
|
||||
)
|
||||
execution_strategy = execution_strategy_named(
|
||||
execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs
|
||||
)
|
||||
|
||||
execution_strategy = execution_strategy_named(
|
||||
execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs
|
||||
)
|
||||
execution_service = WorkflowExecutionService(
|
||||
self.bpmn_process_instance,
|
||||
self.process_instance_model,
|
||||
|
|
|
@ -19,6 +19,8 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel
|
|||
from spiffworkflow_backend.models.process_instance import ProcessInstanceApi
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
|
||||
|
@ -189,6 +191,7 @@ class ProcessInstanceService:
|
|||
def do_waiting(cls, status_value: str) -> None:
|
||||
run_at_in_seconds_threshold = round(time.time())
|
||||
min_age_in_seconds = 60 # to avoid conflicts with the interstitial page, we wait 60 seconds before processing
|
||||
# min_age_in_seconds = 0 # to avoid conflicts with the interstitial page, we wait 60 seconds before processing
|
||||
process_instance_ids_to_check = ProcessInstanceQueueService.peek_many(
|
||||
status_value, run_at_in_seconds_threshold, min_age_in_seconds
|
||||
)
|
||||
|
@ -506,6 +509,7 @@ class ProcessInstanceService:
|
|||
add_docs_and_forms: bool = False,
|
||||
) -> Task:
|
||||
task_type = spiff_task.task_spec.description
|
||||
task_guid = str(spiff_task.id)
|
||||
|
||||
props = {}
|
||||
if hasattr(spiff_task.task_spec, "extensions"):
|
||||
|
@ -521,9 +525,7 @@ class ProcessInstanceService:
|
|||
# can complete it.
|
||||
can_complete = False
|
||||
try:
|
||||
AuthorizationService.assert_user_can_complete_task(
|
||||
processor.process_instance_model.id, str(spiff_task.id), g.user
|
||||
)
|
||||
AuthorizationService.assert_user_can_complete_task(processor.process_instance_model.id, task_guid, g.user)
|
||||
can_complete = True
|
||||
except HumanTaskAlreadyCompletedError:
|
||||
can_complete = False
|
||||
|
@ -540,9 +542,11 @@ class ProcessInstanceService:
|
|||
|
||||
# Grab the last error message.
|
||||
error_message = None
|
||||
for event in processor.process_instance_model.process_instance_events:
|
||||
for detail in event.error_details:
|
||||
error_message = detail.message
|
||||
error_event = ProcessInstanceEventModel.query.filter_by(
|
||||
task_guid=task_guid, event_type=ProcessInstanceEventType.task_failed.value
|
||||
).first()
|
||||
if error_event:
|
||||
error_message = error_event.error_details[-1].message
|
||||
|
||||
task = Task(
|
||||
spiff_task.id,
|
||||
|
|
|
@ -76,9 +76,12 @@ SubprocessSpecLoader = Callable[[], dict[str, Any] | None]
|
|||
class ExecutionStrategy:
|
||||
"""Interface of sorts for a concrete execution strategy."""
|
||||
|
||||
def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader):
|
||||
def __init__(
|
||||
self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader, options: dict | None = None
|
||||
):
|
||||
self.delegate = delegate
|
||||
self.subprocess_spec_loader = subprocess_spec_loader
|
||||
self.options = options
|
||||
|
||||
@abstractmethod
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
|
@ -336,9 +339,14 @@ class SkipOneExecutionStrategy(ExecutionStrategy):
|
|||
"""When you want to to skip over the next task, rather than execute it."""
|
||||
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
|
||||
if len(engine_steps) > 0:
|
||||
spiff_task = engine_steps[0]
|
||||
spiff_task = None
|
||||
if self.options and "spiff_task" in self.options.keys():
|
||||
spiff_task = self.options["spiff_task"]
|
||||
else:
|
||||
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
|
||||
if len(engine_steps) > 0:
|
||||
spiff_task = engine_steps[0]
|
||||
if spiff_task is not None:
|
||||
self.delegate.will_complete_task(spiff_task)
|
||||
spiff_task.complete()
|
||||
self.delegate.did_complete_task(spiff_task)
|
||||
|
|
|
@ -15,6 +15,8 @@ from spiffworkflow_backend.models.db import db
|
|||
from spiffworkflow_backend.models.process_group import ProcessGroup
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel
|
||||
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
|
||||
from spiffworkflow_backend.models.process_instance_report import ReportMetadata
|
||||
|
@ -2506,6 +2508,11 @@ class TestProcessApi(BaseTest):
|
|||
assert response.status_code == 200
|
||||
assert len(response.json) == 7
|
||||
|
||||
task_event = ProcessInstanceEventModel.query.filter_by(
|
||||
task_guid=human_task["guid"], event_type=ProcessInstanceEventType.task_skipped.value
|
||||
).first()
|
||||
assert task_event is not None
|
||||
|
||||
def setup_initial_groups_for_move_tests(self, client: FlaskClient, with_super_admin_user: UserModel) -> None:
|
||||
groups = ["group_a", "group_b", "group_b/group_bb"]
|
||||
# setup initial groups
|
||||
|
|
|
@ -11,6 +11,7 @@ from spiffworkflow_backend.models.db import db
|
|||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
|
||||
|
@ -464,6 +465,11 @@ class TestProcessInstanceProcessor(BaseTest):
|
|||
"stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)"
|
||||
)
|
||||
|
||||
task_event = ProcessInstanceEventModel.query.filter_by(
|
||||
task_guid=human_task_one.task_id, event_type=ProcessInstanceEventType.task_executed_manually.value
|
||||
).first()
|
||||
assert task_event is not None
|
||||
|
||||
def test_step_through_gateway(
|
||||
self,
|
||||
app: Flask,
|
||||
|
@ -501,6 +507,11 @@ class TestProcessInstanceProcessor(BaseTest):
|
|||
assert gateway_task is not None
|
||||
assert gateway_task.state == TaskState.COMPLETED
|
||||
|
||||
task_event = ProcessInstanceEventModel.query.filter_by(
|
||||
task_guid=str(gateway_task.id), event_type=ProcessInstanceEventType.task_executed_manually.value
|
||||
).first()
|
||||
assert task_event is not None
|
||||
|
||||
def test_properly_saves_tasks_when_running(
|
||||
self,
|
||||
app: Flask,
|
||||
|
|
|
@ -147,7 +147,7 @@ export default function ProcessInterstitial({
|
|||
if (['terminated', 'suspended'].includes(pi.status)) {
|
||||
return inlineMessage(
|
||||
`Process ${pi.status}`,
|
||||
'This process instance was {pi.status} by an administrator. Please get in touch with them for more information.',
|
||||
`This process instance was ${pi.status} by an administrator. Please get in touch with them for more information.`,
|
||||
'warning'
|
||||
);
|
||||
}
|
||||
|
|
|
@ -488,7 +488,9 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
|
|||
const initializeTaskDataToDisplay = (task: Task | null) => {
|
||||
if (
|
||||
task &&
|
||||
(task.state === 'COMPLETED' || task.state === 'READY') &&
|
||||
(task.state === 'COMPLETED' ||
|
||||
task.state === 'ERROR' ||
|
||||
task.state === 'READY') &&
|
||||
ability.can('GET', targetUris.processInstanceTaskDataPath)
|
||||
) {
|
||||
setShowTaskDataLoading(true);
|
||||
|
@ -619,7 +621,10 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
|
|||
return (
|
||||
(task.state === 'WAITING' &&
|
||||
subprocessTypes.filter((t) => t === task.typename).length > 0) ||
|
||||
task.state === 'READY'
|
||||
task.state === 'READY' ||
|
||||
(processInstance &&
|
||||
processInstance.status === 'suspended' &&
|
||||
task.state === 'ERROR')
|
||||
);
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue