This commit is contained in:
Dan 2023-05-09 15:02:05 -04:00
parent 7be5bf43fd
commit 30a26de38d
6 changed files with 2340 additions and 2461 deletions

View File

@ -7,18 +7,18 @@ function error_handler() {
trap 'error_handler ${LINENO} $?' ERR
set -o errtrace -o errexit -o nounset -o pipefail
mysql -uroot spiffworkflow_backend_local_development -e '
mysql -uroot spiffworkflow_backend_unit_testing -e '
select * from process_instance;
select t.guid as task_guid, t.state as task_state, td.bpmn_identifier as task_id from task t
join task_definition td on td.id = t.task_definition_id
where process_instance_id=(select max(id) from process_instance) and td.bpmn_identifier = "top_level_subprocess";
where process_instance_id=(select max(id) from process_instance);
select bp.guid as bp_guid, bpd.bpmn_identifier as bp_identifier from bpmn_process bp
join bpmn_process_definition bpd on bpd.id = bp.bpmn_process_definition_id
join bpmn_process bpb on bpb.id = bp.direct_parent_process_id
join process_instance pi on bpb.id = pi.bpmn_process_id
where bpd.bpmn_identifier = "top_level_subprocess" and pi.id = (select max(id) from process_instance);
where pi.id = (select max(id) from process_instance);
'
# mysql -uroot spiffworkflow_backend_local_development -e '\

File diff suppressed because it is too large Load Diff

View File

@ -27,9 +27,9 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"}
# SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" }
sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -415,6 +415,7 @@ class ProcessInstanceProcessor:
self.setup_processor_with_process_instance(
process_instance_model=process_instance_model, validate_only=validate_only
)
self.initialization_time = time.time()
def setup_processor_with_process_instance(
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
@ -1206,136 +1207,43 @@ class ProcessInstanceProcessor:
@classmethod
def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
"""Reset a process to an earlier state."""
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
start_time = time.time()
# Log the event that we are moving back to a previous task.
ProcessInstanceTmpService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
)
to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
if to_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
# If this task model has a parent boundary event, reset to that point instead,
# so we can reset all the boundary timers, etc...
parent_id = to_task_model.properties_json.get("parent", "")
parent = TaskModel.query.filter_by(guid=parent_id).first()
is_boundary_parent = False
if parent and parent.task_definition.typename == "_BoundaryEventParent":
to_task_model = parent
is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process.
# NOTE: run ALL queries before making changes to ensure we get everything before anything changes
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
to_task_model
)
task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes]
tasks_to_update_query = db.session.query(TaskModel).filter(
and_(
or_(
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
)
)
tasks_to_update = tasks_to_update_query.all()
tasks_to_update_guids = [t.guid for t in tasks_to_update]
tasks_to_delete_query = db.session.query(TaskModel).filter(
and_(
or_(
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
)
)
tasks_to_delete = tasks_to_delete_query.all()
tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# delete bpmn processes that are also tasks that we either deleted or will update.
# this is to force spiff to recreate those bpmn processes with the correct associated task guids.
bpmn_processes_to_delete_query = db.session.query(BpmnProcessModel).filter(
or_(
BpmnProcessModel.guid.in_(tasks_to_delete_guids), # type: ignore
and_(
BpmnProcessModel.guid.in_(tasks_to_update_guids), # type: ignore
BpmnProcessModel.id.not_in(parent_bpmn_processes_ids), # type: ignore
),
)
)
bpmn_processes_to_delete = bpmn_processes_to_delete_query.order_by(
BpmnProcessModel.id.desc() # type: ignore
).all()
# delete any human task that was for a task that we deleted since they will get recreated later.
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
).all()
# ensure the correct order for foreign keys
for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
for task_to_delete in tasks_to_delete:
db.session.delete(task_to_delete)
for bpmn_process_to_delete in bpmn_processes_to_delete:
db.session.delete(bpmn_process_to_delete)
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
tasks_to_update_ids = [t.id for t in tasks_to_update]
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
).all()
for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
for task_to_update in tasks_to_update:
TaskService.reset_task_model(task_to_update, state="FUTURE")
db.session.bulk_save_objects(tasks_to_update)
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
if parent_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
TaskService.reset_task_model(
to_task_model,
state="READY",
json_data_hash=parent_task_model.json_data_hash,
python_env_data_hash=parent_task_model.python_env_data_hash,
)
db.session.add(to_task_model)
for task_model in task_models_of_parent_bpmn_processes:
TaskService.reset_task_model(task_model, state="WAITING")
db.session.bulk_save_objects(task_models_of_parent_bpmn_processes)
bpmn_process = to_task_model.bpmn_process
properties_json = copy.copy(bpmn_process.properties_json)
properties_json["last_task"] = parent_task_model.guid
bpmn_process.properties_json = properties_json
db.session.add(bpmn_process)
db.session.commit()
processor = ProcessInstanceProcessor(process_instance)
processor.bpmn_process_instance.reset_task_from_id(UUID(to_task_guid))
# If this as a boundary event parent, run it, so we get back to an active task.
if is_boundary_parent:
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
spiff_tasks_updated = {}
for task in processor.bpmn_process_instance.get_tasks():
if task.last_state_change > start_time:
spiff_tasks_updated[str(task.id)] = task
# Remove any human tasks that were updated.
human_tasks_to_clear = HumanTaskModel.query.filter(
HumanTaskModel.task_id.in_(list(spiff_tasks_updated.keys()) # type: ignore
)).all()
for record in human_tasks_to_clear:
db.session.delete(record)
task_service = TaskService(
process_instance=processor.process_instance_model,
serializer=processor._serializer,
bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings,
)
for id, spiff_task in spiff_tasks_updated.items():
task_service.update_task_model_with_spiff_task(spiff_task)
task_service.save_objects_to_database()
# Why can't we just do this?
processor.save()
processor.suspend()
@staticmethod
def get_parser() -> MyCustomParser:
"""Get_parser."""

View File

@ -575,30 +575,6 @@ class TaskService:
task_model["start_in_seconds"] = None
task_model["end_in_seconds"] = None
@classmethod
def reset_task_model(
cls,
task_model: TaskModel,
state: str,
json_data_hash: Optional[str] = None,
python_env_data_hash: Optional[str] = None,
) -> None:
if json_data_hash is None:
cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "json_data_hash")
else:
task_model.json_data_hash = json_data_hash
if python_env_data_hash is None:
cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "python_env_data")
else:
task_model.python_env_data_hash = python_env_data_hash
task_model.state = state
task_model.start_in_seconds = None
task_model.end_in_seconds = None
new_properties_json = copy.copy(task_model.properties_json)
new_properties_json["state"] = getattr(TaskState, state)
task_model.properties_json = new_properties_json
@classmethod
def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict:

View File

@ -469,6 +469,7 @@ class TestProcessInstanceProcessor(BaseTest):
processor.suspend()
processor = ProcessInstanceProcessor(process_instance)
ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id))
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
human_task_one = process_instance.active_human_tasks[0]
assert human_task_one.task_title == "Manual Task #1"
processor = ProcessInstanceProcessor(process_instance)