resetting tasks somewhat work

This commit is contained in:
jasquat 2023-03-22 09:44:13 -04:00
parent 034201b01c
commit 26af07befd
6 changed files with 152 additions and 70 deletions

View File

@ -901,12 +901,6 @@ paths:
description: The identifier of the process to use for the diagram. Useful for displaying the diagram for a call activity. description: The identifier of the process to use for the diagram. Useful for displaying the diagram for a call activity.
schema: schema:
type: string type: string
- name: all_tasks
in: query
required: false
description: If true, this wil return all tasks associated with the process instance and not just user tasks.
schema:
type: boolean
- name: most_recent_tasks_only - name: most_recent_tasks_only
in: query in: query
required: false required: false
@ -960,12 +954,6 @@ paths:
description: The identifier of the process to use for the diagram. Useful for displaying the diagram for a call activity. description: The identifier of the process to use for the diagram. Useful for displaying the diagram for a call activity.
schema: schema:
type: string type: string
- name: all_tasks
in: query
required: false
description: If true, this wil return all tasks associated with the process instance and not just user tasks.
schema:
type: boolean
- name: most_recent_tasks_only - name: most_recent_tasks_only
in: query in: query
required: false required: false
@ -1188,7 +1176,7 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/process-instance-reset/{modified_process_model_identifier}/{process_instance_id}/{spiff_step}: /process-instance-reset/{modified_process_model_identifier}/{process_instance_id}/{to_task_guid}:
parameters: parameters:
- name: modified_process_model_identifier - name: modified_process_model_identifier
in: path in: path
@ -1202,12 +1190,12 @@ paths:
description: The unique id of an existing process instance. description: The unique id of an existing process instance.
schema: schema:
type: integer type: integer
- name: spiff_step - name: to_task_guid
in: query in: path
required: false required: true
description: Reset the process to this state description: Get the tasks only up to the given guid.
schema: schema:
type: integer type: string
post: post:
operationId: spiffworkflow_backend.routes.process_instances_controller.process_instance_reset operationId: spiffworkflow_backend.routes.process_instances_controller.process_instance_reset
summary: Reset a process instance to an earlier step summary: Reset a process instance to an earlier step

View File

@ -63,7 +63,7 @@ class TaskModel(SpiffworkflowBaseDBModel):
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
python_env_data_hash: str = db.Column(db.String(255), nullable=False, index=True) python_env_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) start_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6))
end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6)) end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6))
data: Optional[dict] = None data: Optional[dict] = None

View File

@ -555,8 +555,6 @@ def process_instance_report_show(
def process_instance_task_list_without_task_data_for_me( def process_instance_task_list_without_task_data_for_me(
modified_process_model_identifier: str, modified_process_model_identifier: str,
process_instance_id: int, process_instance_id: int,
all_tasks: bool = False,
spiff_step: int = 0,
most_recent_tasks_only: bool = False, most_recent_tasks_only: bool = False,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
to_task_guid: Optional[str] = None, to_task_guid: Optional[str] = None,
@ -566,8 +564,6 @@ def process_instance_task_list_without_task_data_for_me(
return process_instance_task_list( return process_instance_task_list(
_modified_process_model_identifier=modified_process_model_identifier, _modified_process_model_identifier=modified_process_model_identifier,
process_instance=process_instance, process_instance=process_instance,
all_tasks=all_tasks,
spiff_step=spiff_step,
most_recent_tasks_only=most_recent_tasks_only, most_recent_tasks_only=most_recent_tasks_only,
bpmn_process_guid=bpmn_process_guid, bpmn_process_guid=bpmn_process_guid,
to_task_guid=to_task_guid, to_task_guid=to_task_guid,
@ -577,8 +573,6 @@ def process_instance_task_list_without_task_data_for_me(
def process_instance_task_list_without_task_data( def process_instance_task_list_without_task_data(
modified_process_model_identifier: str, modified_process_model_identifier: str,
process_instance_id: int, process_instance_id: int,
all_tasks: bool = False,
spiff_step: int = 0,
most_recent_tasks_only: bool = False, most_recent_tasks_only: bool = False,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
to_task_guid: Optional[str] = None, to_task_guid: Optional[str] = None,
@ -588,8 +582,6 @@ def process_instance_task_list_without_task_data(
return process_instance_task_list( return process_instance_task_list(
_modified_process_model_identifier=modified_process_model_identifier, _modified_process_model_identifier=modified_process_model_identifier,
process_instance=process_instance, process_instance=process_instance,
all_tasks=all_tasks,
spiff_step=spiff_step,
most_recent_tasks_only=most_recent_tasks_only, most_recent_tasks_only=most_recent_tasks_only,
bpmn_process_guid=bpmn_process_guid, bpmn_process_guid=bpmn_process_guid,
to_task_guid=to_task_guid, to_task_guid=to_task_guid,
@ -600,8 +592,6 @@ def process_instance_task_list(
_modified_process_model_identifier: str, _modified_process_model_identifier: str,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
all_tasks: bool = False,
spiff_step: int = 0,
to_task_guid: Optional[str] = None, to_task_guid: Optional[str] = None,
most_recent_tasks_only: bool = False, most_recent_tasks_only: bool = False,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
@ -679,12 +669,11 @@ def process_instance_task_list(
def process_instance_reset( def process_instance_reset(
process_instance_id: int, process_instance_id: int,
modified_process_model_identifier: str, modified_process_model_identifier: str,
spiff_step: int = 0, to_task_guid: str,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Reset a process instance to a particular step.""" """Reset a process instance to a particular step."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, to_task_guid, commit=True)
processor.reset_process(spiff_step)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")

View File

@ -52,6 +52,8 @@ from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ign
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import and_
from sqlalchemy import or_
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
@ -85,7 +87,8 @@ from spiffworkflow_backend.models.script_attributes_context import (
) )
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel
from spiffworkflow_backend.models.task import TaskNotFoundError
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
@ -154,10 +157,6 @@ class SpiffStepDetailIsMissingError(Exception):
pass pass
class TaskNotFoundError(Exception):
pass
class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore
def __init__(self, environment_globals: Dict[str, Any]): def __init__(self, environment_globals: Dict[str, Any]):
"""BoxedTaskDataBasedScriptEngineEnvironment.""" """BoxedTaskDataBasedScriptEngineEnvironment."""
@ -1312,48 +1311,103 @@ class ProcessInstanceProcessor:
# Saving the workflow seems to reset the status # Saving the workflow seems to reset the status
self.suspend() self.suspend()
def reset_process(self, spiff_step: int) -> None: @classmethod
def reset_process(
cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False
) -> None:
"""Reset a process to an earlier state.""" """Reset a process to an earlier state."""
spiff_logger = logging.getLogger("spiff") cls.add_event_to_process_instance(
spiff_logger.info( process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
f"Process reset from step {spiff_step}",
extra=self.bpmn_process_instance.log_info(),
) )
step_detail = ( to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
db.session.query(SpiffStepDetailsModel) if to_task_model is None:
.filter( raise TaskNotFoundError(
SpiffStepDetailsModel.process_instance_id == self.process_instance_model.id, f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
SpiffStepDetailsModel.spiff_step == spiff_step,
) )
.first()
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
to_task_model
) )
if step_detail is not None: task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
self.increment_spiff_step() parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes]
self.add_step( tasks_to_update_query = db.session.query(TaskModel).filter(
{ and_(
"process_instance_id": self.process_instance_model.id, or_(
"spiff_step": self.process_instance_model.spiff_step or 1, TaskModel.end_in_seconds > to_task_model.end_in_seconds,
"task_json": step_detail.task_json, TaskModel.end_in_seconds.is_not(None), # type: ignore
"timestamp": round(time.time()), ),
} TaskModel.process_instance_id == process_instance.id,
TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
)
)
tasks_to_update = tasks_to_update_query.all()
# run all queries before making changes to task_model
if commit:
tasks_to_delete_query = db.session.query(TaskModel).filter(
and_(
or_(
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_not(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
)
) )
dct = self._serializer.workflow_to_dict(self.bpmn_process_instance) tasks_to_delete = tasks_to_delete_query.all()
dct["tasks"] = step_detail.task_json["tasks"]
dct["subprocesses"] = step_detail.task_json["subprocesses"]
self.bpmn_process_instance = self._serializer.workflow_from_dict(dct)
# Cascade does not seems to work on filters, only directly through the session # delete any later tasks from to_task_model and delete bpmn processes that may be
tasks = self.bpmn_process_instance.get_tasks(TaskState.NOT_FINISHED_MASK) # link directly to one of those tasks.
rows = HumanTaskModel.query.filter( tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
HumanTaskModel.task_id.in_(str(t.id) for t in tasks) # type: ignore tasks_to_delete_ids = [t.id for t in tasks_to_delete]
bpmn_processes_to_delete = BpmnProcessModel.query.filter(
BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore
).all()
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
).all() ).all()
for row in rows:
db.session.delete(row)
self.save() # ensure the correct order for foreign keys
self.suspend() for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
db.session.commit()
for task_to_delete in tasks_to_delete:
db.session.delete(task_to_delete)
db.session.commit()
for bpmn_process_to_delete in bpmn_processes_to_delete:
db.session.delete(bpmn_process_to_delete)
db.session.commit()
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
for task_to_update in tasks_to_update:
TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
if parent_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
TaskService.reset_task_model(
to_task_model,
state="READY",
json_data_hash=parent_task_model.json_data_hash,
python_env_data_hash=parent_task_model.python_env_data_hash,
commit=commit,
)
for task_model in task_models_of_parent_bpmn_processes:
TaskService.reset_task_model(task_model, state="WAITING", commit=commit)
if commit:
processor = ProcessInstanceProcessor(process_instance)
processor.save()
processor.suspend()
@staticmethod @staticmethod
def get_parser() -> MyCustomParser: def get_parser() -> MyCustomParser:

View File

@ -9,6 +9,7 @@ from flask import current_app
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.task import TaskStateNames from SpiffWorkflow.task import TaskStateNames
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert from sqlalchemy.dialects.postgresql import insert as postgres_insert
@ -317,6 +318,56 @@ class TaskService:
return bpmn_processes + cls.bpmn_process_and_descendants(direct_children) return bpmn_processes + cls.bpmn_process_and_descendants(direct_children)
return bpmn_processes return bpmn_processes
@classmethod
def task_models_of_parent_bpmn_processes(
cls, task_model: TaskModel
) -> Tuple[list[BpmnProcessModel], list[TaskModel]]:
bpmn_process = task_model.bpmn_process
task_models: list[TaskModel] = []
bpmn_processes: list[BpmnProcessModel] = [bpmn_process]
if bpmn_process.guid is not None:
parent_task_model = TaskModel.query.filter_by(guid=bpmn_process.guid).first()
if parent_task_model is not None:
b, t = cls.task_models_of_parent_bpmn_processes(parent_task_model)
return (bpmn_processes + b, [parent_task_model] + t)
return (bpmn_processes, task_models)
@classmethod
def reset_task_model(
cls,
task_model: TaskModel,
state: str,
commit: Optional[bool] = True,
json_data_hash: Optional[str] = None,
python_env_data_hash: Optional[str] = None,
) -> None:
if json_data_hash is None:
TaskService.update_task_data_on_task_model(task_model, {}, "json_data_hash")
else:
task_model.json_data_hash = json_data_hash
if python_env_data_hash is None:
TaskService.update_task_data_on_task_model(task_model, {}, "python_env_data")
else:
task_model.python_env_data_hash = python_env_data_hash
new_properties_json = task_model.properties_json
task_model.state = state
task_model.start_in_seconds = None
task_model.end_in_seconds = None
if commit:
db.session.add(task_model)
db.session.commit()
new_properties_json["state"] = getattr(TaskState, state)
task_model.properties_json = new_properties_json
if commit:
# if we commit the properties json at the same time as the other items
# the json gets reset for some reason.
db.session.add(task_model)
db.session.commit()
@classmethod @classmethod
def _create_task( def _create_task(
cls, cls,

View File

@ -144,7 +144,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
path: `${apiPath}/${modifiedProcessModelId}/${params.process_instance_id}${queryParams}`, path: `${apiPath}/${modifiedProcessModelId}/${params.process_instance_id}${queryParams}`,
successCallback: setProcessInstance, successCallback: setProcessInstance,
}); });
let taskParams = '?all_tasks=true&most_recent_tasks_only=true'; let taskParams = '?most_recent_tasks_only=true';
if (typeof params.to_task_guid !== 'undefined') { if (typeof params.to_task_guid !== 'undefined') {
taskParams = `${taskParams}&to_task_guid=${params.to_task_guid}`; taskParams = `${taskParams}&to_task_guid=${params.to_task_guid}`;
} }