mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-01-13 02:54:27 +00:00
Merge pull request #252 from sartography/bug/improve_reset_to_previous_task
Bug/improve reset to previous task
This commit is contained in:
commit
08bb92402e
@ -7,18 +7,18 @@ function error_handler() {
|
|||||||
trap 'error_handler ${LINENO} $?' ERR
|
trap 'error_handler ${LINENO} $?' ERR
|
||||||
set -o errtrace -o errexit -o nounset -o pipefail
|
set -o errtrace -o errexit -o nounset -o pipefail
|
||||||
|
|
||||||
mysql -uroot spiffworkflow_backend_local_development -e '
|
mysql -uroot spiffworkflow_backend_unit_testing -e '
|
||||||
select * from process_instance;
|
select * from process_instance;
|
||||||
|
|
||||||
select t.guid as task_guid, t.state as task_state, td.bpmn_identifier as task_id from task t
|
select t.guid as task_guid, t.state as task_state, td.bpmn_identifier as task_id from task t
|
||||||
join task_definition td on td.id = t.task_definition_id
|
join task_definition td on td.id = t.task_definition_id
|
||||||
where process_instance_id=(select max(id) from process_instance) and td.bpmn_identifier = "top_level_subprocess";
|
where process_instance_id=(select max(id) from process_instance);
|
||||||
|
|
||||||
select bp.guid as bp_guid, bpd.bpmn_identifier as bp_identifier from bpmn_process bp
|
select bp.guid as bp_guid, bpd.bpmn_identifier as bp_identifier from bpmn_process bp
|
||||||
join bpmn_process_definition bpd on bpd.id = bp.bpmn_process_definition_id
|
join bpmn_process_definition bpd on bpd.id = bp.bpmn_process_definition_id
|
||||||
join bpmn_process bpb on bpb.id = bp.direct_parent_process_id
|
join bpmn_process bpb on bpb.id = bp.direct_parent_process_id
|
||||||
join process_instance pi on bpb.id = pi.bpmn_process_id
|
join process_instance pi on bpb.id = pi.bpmn_process_id
|
||||||
where bpd.bpmn_identifier = "top_level_subprocess" and pi.id = (select max(id) from process_instance);
|
where pi.id = (select max(id) from process_instance);
|
||||||
'
|
'
|
||||||
|
|
||||||
# mysql -uroot spiffworkflow_backend_local_development -e '\
|
# mysql -uroot spiffworkflow_backend_local_development -e '\
|
||||||
|
@ -29,7 +29,7 @@ flask-restful = "*"
|
|||||||
werkzeug = "*"
|
werkzeug = "*"
|
||||||
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
|
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
|
||||||
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
|
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
|
||||||
# SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
|
# SpiffWorkflow = {develop = true, path = "../../spiffworkflow/" }
|
||||||
sentry-sdk = "^1.10"
|
sentry-sdk = "^1.10"
|
||||||
sphinx-autoapi = "^2.0"
|
sphinx-autoapi = "^2.0"
|
||||||
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}
|
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}
|
||||||
|
@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None:
|
|||||||
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None:
|
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None:
|
||||||
database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}"
|
database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}"
|
||||||
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite":
|
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite":
|
||||||
app.config["SQLALCHEMY_DATABASE_URI"] = (
|
app.config[
|
||||||
f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3"
|
"SQLALCHEMY_DATABASE_URI"
|
||||||
)
|
] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3"
|
||||||
elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres":
|
elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres":
|
||||||
app.config["SQLALCHEMY_DATABASE_URI"] = (
|
app.config[
|
||||||
f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}"
|
"SQLALCHEMY_DATABASE_URI"
|
||||||
)
|
] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}"
|
||||||
else:
|
else:
|
||||||
# use pswd to trick flake8 with hardcoded passwords
|
# use pswd to trick flake8 with hardcoded passwords
|
||||||
db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD")
|
db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD")
|
||||||
|
@ -129,9 +129,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
|||||||
def serialized_with_metadata(self) -> dict[str, Any]:
|
def serialized_with_metadata(self) -> dict[str, Any]:
|
||||||
process_instance_attributes = self.serialized
|
process_instance_attributes = self.serialized
|
||||||
process_instance_attributes["process_metadata"] = self.process_metadata
|
process_instance_attributes["process_metadata"] = self.process_metadata
|
||||||
process_instance_attributes["process_model_with_diagram_identifier"] = (
|
process_instance_attributes[
|
||||||
self.process_model_with_diagram_identifier
|
"process_model_with_diagram_identifier"
|
||||||
)
|
] = self.process_model_with_diagram_identifier
|
||||||
return process_instance_attributes
|
return process_instance_attributes
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
# TODO: clean up this service for a clear distinction between it and the process_instance_service
|
# TODO: clean up this service for a clear distinction between it and the process_instance_service
|
||||||
# where this points to the pi service
|
# where this points to the pi service
|
||||||
import _strptime # type: ignore
|
import _strptime # type: ignore
|
||||||
import copy
|
|
||||||
import decimal
|
import decimal
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
@ -53,7 +52,6 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
|||||||
from SpiffWorkflow.task import TaskState
|
from SpiffWorkflow.task import TaskState
|
||||||
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
|
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
|
||||||
from sqlalchemy import and_
|
from sqlalchemy import and_
|
||||||
from sqlalchemy import or_
|
|
||||||
|
|
||||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||||
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
|
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
|
||||||
@ -423,9 +421,9 @@ class ProcessInstanceProcessor:
|
|||||||
tld.process_instance_id = process_instance_model.id
|
tld.process_instance_id = process_instance_model.id
|
||||||
|
|
||||||
# we want this to be the fully qualified path to the process model including all group subcomponents
|
# we want this to be the fully qualified path to the process model including all group subcomponents
|
||||||
current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = (
|
current_app.config[
|
||||||
f"{process_instance_model.process_model_identifier}"
|
"THREAD_LOCAL_DATA"
|
||||||
)
|
].process_model_identifier = f"{process_instance_model.process_model_identifier}"
|
||||||
|
|
||||||
self.process_instance_model = process_instance_model
|
self.process_instance_model = process_instance_model
|
||||||
self.process_model_service = ProcessModelService()
|
self.process_model_service = ProcessModelService()
|
||||||
@ -585,9 +583,9 @@ class ProcessInstanceProcessor:
|
|||||||
bpmn_subprocess_definition.bpmn_identifier
|
bpmn_subprocess_definition.bpmn_identifier
|
||||||
] = bpmn_process_definition_dict
|
] = bpmn_process_definition_dict
|
||||||
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
|
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
|
||||||
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
|
bpmn_subprocess_definition_bpmn_identifiers[
|
||||||
bpmn_subprocess_definition.bpmn_identifier
|
bpmn_subprocess_definition.id
|
||||||
)
|
] = bpmn_subprocess_definition.bpmn_identifier
|
||||||
|
|
||||||
task_definitions = TaskDefinitionModel.query.filter(
|
task_definitions = TaskDefinitionModel.query.filter(
|
||||||
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore
|
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore
|
||||||
@ -1199,140 +1197,27 @@ class ProcessInstanceProcessor:
|
|||||||
# Saving the workflow seems to reset the status
|
# Saving the workflow seems to reset the status
|
||||||
self.suspend()
|
self.suspend()
|
||||||
|
|
||||||
# FIXME: this currently cannot work for multi-instance tasks and loopback. It can somewhat for not those
|
|
||||||
# if we can properly handling resetting children tasks. Right now if we set them all to FUTURE then
|
|
||||||
# they never get picked up by spiff and processed. The process instance just stops after the to_task_guid
|
|
||||||
# and marks itself complete without processing any of the children.
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
|
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
|
||||||
"""Reset a process to an earlier state."""
|
"""Reset a process to an earlier state."""
|
||||||
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
|
start_time = time.time()
|
||||||
|
|
||||||
|
# Log the event that we are moving back to a previous task.
|
||||||
ProcessInstanceTmpService.add_event_to_process_instance(
|
ProcessInstanceTmpService.add_event_to_process_instance(
|
||||||
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
|
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
|
||||||
)
|
)
|
||||||
|
|
||||||
to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
|
|
||||||
if to_task_model is None:
|
|
||||||
raise TaskNotFoundError(
|
|
||||||
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
|
|
||||||
)
|
|
||||||
# If this task model has a parent boundary event, reset to that point instead,
|
|
||||||
# so we can reset all the boundary timers, etc...
|
|
||||||
parent_id = to_task_model.properties_json.get("parent", "")
|
|
||||||
parent = TaskModel.query.filter_by(guid=parent_id).first()
|
|
||||||
is_boundary_parent = False
|
|
||||||
if parent and parent.task_definition.typename == "_BoundaryEventParent":
|
|
||||||
to_task_model = parent
|
|
||||||
is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process.
|
|
||||||
|
|
||||||
# NOTE: run ALL queries before making changes to ensure we get everything before anything changes
|
|
||||||
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
|
|
||||||
to_task_model
|
|
||||||
)
|
|
||||||
task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
|
|
||||||
parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes]
|
|
||||||
|
|
||||||
tasks_to_update_query = db.session.query(TaskModel).filter(
|
|
||||||
and_(
|
|
||||||
or_(
|
|
||||||
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
|
|
||||||
TaskModel.end_in_seconds.is_(None), # type: ignore
|
|
||||||
),
|
|
||||||
TaskModel.process_instance_id == process_instance.id,
|
|
||||||
TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
|
|
||||||
)
|
|
||||||
)
|
|
||||||
tasks_to_update = tasks_to_update_query.all()
|
|
||||||
tasks_to_update_guids = [t.guid for t in tasks_to_update]
|
|
||||||
|
|
||||||
tasks_to_delete_query = db.session.query(TaskModel).filter(
|
|
||||||
and_(
|
|
||||||
or_(
|
|
||||||
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
|
|
||||||
TaskModel.end_in_seconds.is_(None), # type: ignore
|
|
||||||
),
|
|
||||||
TaskModel.process_instance_id == process_instance.id,
|
|
||||||
TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
|
|
||||||
TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
|
|
||||||
)
|
|
||||||
)
|
|
||||||
tasks_to_delete = tasks_to_delete_query.all()
|
|
||||||
tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
|
|
||||||
tasks_to_delete_ids = [t.id for t in tasks_to_delete]
|
|
||||||
|
|
||||||
# delete bpmn processes that are also tasks that we either deleted or will update.
|
|
||||||
# this is to force spiff to recreate those bpmn processes with the correct associated task guids.
|
|
||||||
bpmn_processes_to_delete_query = db.session.query(BpmnProcessModel).filter(
|
|
||||||
or_(
|
|
||||||
BpmnProcessModel.guid.in_(tasks_to_delete_guids), # type: ignore
|
|
||||||
and_(
|
|
||||||
BpmnProcessModel.guid.in_(tasks_to_update_guids), # type: ignore
|
|
||||||
BpmnProcessModel.id.not_in(parent_bpmn_processes_ids), # type: ignore
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
bpmn_processes_to_delete = bpmn_processes_to_delete_query.order_by(
|
|
||||||
BpmnProcessModel.id.desc() # type: ignore
|
|
||||||
).all()
|
|
||||||
|
|
||||||
# delete any human task that was for a task that we deleted since they will get recreated later.
|
|
||||||
human_tasks_to_delete = HumanTaskModel.query.filter(
|
|
||||||
HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
|
|
||||||
).all()
|
|
||||||
|
|
||||||
# ensure the correct order for foreign keys
|
|
||||||
for human_task_to_delete in human_tasks_to_delete:
|
|
||||||
db.session.delete(human_task_to_delete)
|
|
||||||
for task_to_delete in tasks_to_delete:
|
|
||||||
db.session.delete(task_to_delete)
|
|
||||||
for bpmn_process_to_delete in bpmn_processes_to_delete:
|
|
||||||
db.session.delete(bpmn_process_to_delete)
|
|
||||||
|
|
||||||
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
|
|
||||||
if related_human_task is not None:
|
|
||||||
db.session.delete(related_human_task)
|
|
||||||
|
|
||||||
tasks_to_update_ids = [t.id for t in tasks_to_update]
|
|
||||||
human_tasks_to_delete = HumanTaskModel.query.filter(
|
|
||||||
HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
|
|
||||||
).all()
|
|
||||||
for human_task_to_delete in human_tasks_to_delete:
|
|
||||||
db.session.delete(human_task_to_delete)
|
|
||||||
|
|
||||||
for task_to_update in tasks_to_update:
|
|
||||||
TaskService.reset_task_model(task_to_update, state="FUTURE")
|
|
||||||
db.session.bulk_save_objects(tasks_to_update)
|
|
||||||
|
|
||||||
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
|
|
||||||
if parent_task_model is None:
|
|
||||||
raise TaskNotFoundError(
|
|
||||||
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
|
|
||||||
)
|
|
||||||
|
|
||||||
TaskService.reset_task_model(
|
|
||||||
to_task_model,
|
|
||||||
state="READY",
|
|
||||||
json_data_hash=parent_task_model.json_data_hash,
|
|
||||||
python_env_data_hash=parent_task_model.python_env_data_hash,
|
|
||||||
)
|
|
||||||
db.session.add(to_task_model)
|
|
||||||
for task_model in task_models_of_parent_bpmn_processes:
|
|
||||||
TaskService.reset_task_model(task_model, state="WAITING")
|
|
||||||
db.session.bulk_save_objects(task_models_of_parent_bpmn_processes)
|
|
||||||
|
|
||||||
bpmn_process = to_task_model.bpmn_process
|
|
||||||
properties_json = copy.copy(bpmn_process.properties_json)
|
|
||||||
properties_json["last_task"] = parent_task_model.guid
|
|
||||||
bpmn_process.properties_json = properties_json
|
|
||||||
db.session.add(bpmn_process)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid))
|
||||||
|
spiff_tasks = processor.bpmn_process_instance.get_tasks()
|
||||||
|
|
||||||
# If this as a boundary event parent, run it, so we get back to an active task.
|
task_service = TaskService(
|
||||||
if is_boundary_parent:
|
process_instance=processor.process_instance_model,
|
||||||
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
|
serializer=processor._serializer,
|
||||||
|
bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings,
|
||||||
|
)
|
||||||
|
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time)
|
||||||
|
|
||||||
|
# Save the process
|
||||||
processor.save()
|
processor.save()
|
||||||
processor.suspend()
|
processor.suspend()
|
||||||
|
|
||||||
|
@ -602,31 +602,6 @@ class TaskService:
|
|||||||
task_model["start_in_seconds"] = None
|
task_model["start_in_seconds"] = None
|
||||||
task_model["end_in_seconds"] = None
|
task_model["end_in_seconds"] = None
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def reset_task_model(
|
|
||||||
cls,
|
|
||||||
task_model: TaskModel,
|
|
||||||
state: str,
|
|
||||||
json_data_hash: Optional[str] = None,
|
|
||||||
python_env_data_hash: Optional[str] = None,
|
|
||||||
) -> None:
|
|
||||||
if json_data_hash is None:
|
|
||||||
cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "json_data_hash")
|
|
||||||
else:
|
|
||||||
task_model.json_data_hash = json_data_hash
|
|
||||||
if python_env_data_hash is None:
|
|
||||||
cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "python_env_data")
|
|
||||||
else:
|
|
||||||
task_model.python_env_data_hash = python_env_data_hash
|
|
||||||
|
|
||||||
task_model.state = state
|
|
||||||
task_model.start_in_seconds = None
|
|
||||||
task_model.end_in_seconds = None
|
|
||||||
|
|
||||||
new_properties_json = copy.copy(task_model.properties_json)
|
|
||||||
new_properties_json["state"] = getattr(TaskState, state)
|
|
||||||
task_model.properties_json = new_properties_json
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict:
|
def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict:
|
||||||
task_definition = task_model.task_definition
|
task_definition = task_model.task_definition
|
||||||
|
@ -469,10 +469,11 @@ class TestProcessInstanceProcessor(BaseTest):
|
|||||||
processor.suspend()
|
processor.suspend()
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id))
|
ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id))
|
||||||
|
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
|
||||||
human_task_one = process_instance.active_human_tasks[0]
|
human_task_one = process_instance.active_human_tasks[0]
|
||||||
assert human_task_one.task_title == "Manual Task #1"
|
assert human_task_one.task_title == "Manual Task #1"
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
processor.manual_complete_task(str(spiff_manual_task.id), execute=True)
|
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
processor.resume()
|
processor.resume()
|
||||||
processor.do_engine_steps(save=True)
|
processor.do_engine_steps(save=True)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user