From ceb06cc22785438f52d32d394f08e669375ef9bc Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Tue, 10 Oct 2023 11:17:09 -0400 Subject: [PATCH] Hotfix/user task with timer cancel (#533) * cherry picked changes from b12af9f3bc625a6b12cfa0b8d908b378b6be9442 to pin form json files * use the class name to determine what a task type is w/ burnettk * initial thoughts to fix cancel timer issue w/ burnettk * added migration to run predict on all open instances w/ burnettk * remove debug, refactor data migrations, add benchmark_log_func * log progress of script * only process predicted tasks and their parents in the version 2 data miagration w/ burnettk * added data migrator and using that to run version 2 migrations when needed w/ burnettk * removed some unwanted code * fix issue, but tests still need updating * fix tests by returning code to closer to what it was --------- Co-authored-by: jasquat Co-authored-by: burnettk --- .../bin/boot_server_in_docker | 2 +- .../bin/data_migrations/run_all.py | 85 +++++++++++++++++++ .../bin/run_data_migrations_locally | 13 +++ .../process_instance_migrator.py | 53 ++++++++++++ .../data_migrations/version_2.py | 39 +++++++++ .../routes/extensions_controller.py | 1 + .../routes/process_instances_controller.py | 7 +- .../routes/tasks_controller.py | 20 +++-- .../services/process_instance_processor.py | 2 +- .../services/process_instance_service.py | 3 + 10 files changed, 216 insertions(+), 9 deletions(-) create mode 100644 spiffworkflow-backend/bin/data_migrations/run_all.py create mode 100755 spiffworkflow-backend/bin/run_data_migrations_locally create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py diff --git a/spiffworkflow-backend/bin/boot_server_in_docker b/spiffworkflow-backend/bin/boot_server_in_docker index 35cd08d65..393adc35e 100755 --- a/spiffworkflow-backend/bin/boot_server_in_docker +++ b/spiffworkflow-backend/bin/boot_server_in_docker @@ -93,7 +93,7 @@ fi # DELETE after this runs on all necessary environments # TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations -poetry run python ./bin/data_migrations/version_1_3.py +poetry run python ./bin/data_migrations/run_all.py # --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh export IS_GUNICORN="true" diff --git a/spiffworkflow-backend/bin/data_migrations/run_all.py b/spiffworkflow-backend/bin/data_migrations/run_all.py new file mode 100644 index 000000000..132720ea4 --- /dev/null +++ b/spiffworkflow-backend/bin/data_migrations/run_all.py @@ -0,0 +1,85 @@ +import time + +from flask import current_app +from spiffworkflow_backend import create_app +from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree +from spiffworkflow_backend.data_migrations.version_2 import Version2 +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from sqlalchemy import update + + +# simple decorator to time the func +# https://stackoverflow.com/a/11151365/6090676, thank you +def benchmark_log_func(func): + """ + decorator to calculate the total time of a func + """ + + def st_func(*args, **kwargs): + t1 = time.time() + r = func(*args, **kwargs) + t2 = time.time() + # __qualname__, i know you use it every day. but if not, it's the function name prefixed with any qualifying class names + current_app.logger.debug(f"Function={func.__qualname__}, Time={t2 - t1}") + return r + + return st_func + + +@benchmark_log_func +def put_serializer_version_onto_numeric_track() -> None: + old_busted_serializer_version = "1.0-spiffworkflow-backend" + update_query = ( + update(ProcessInstanceModel) + .where(ProcessInstanceModel.spiff_serializer_version == old_busted_serializer_version) + .values(spiff_serializer_version="1") + ) + db.session.execute(update_query) + db.session.commit() + + +def all_potentially_relevant_process_instances() -> list[ProcessInstanceModel]: + return ProcessInstanceModel.query.filter( + ProcessInstanceModel.spiff_serializer_version < Version2.VERSION, + ProcessInstanceModel.status.in_(ProcessInstanceModel.non_terminal_statuses()), + ).all() + + +@benchmark_log_func +def run_version_1() -> None: + VersionOneThree().run() # make this a class method + + +@benchmark_log_func +def run_version_2(process_instances: list[ProcessInstanceModel]) -> None: + Version2.run(process_instances) + + +def main() -> None: + start_time = time.time() + app = create_app() + end_time = time.time() + + with app.app_context(): + current_app.logger.debug(f"data_migrations/run_all::create_app took {end_time - start_time} seconds") + start_time = time.time() + put_serializer_version_onto_numeric_track() + process_instances = all_potentially_relevant_process_instances() + potentially_relevant_instance_count = len(process_instances) + current_app.logger.debug( + f"Found potentially relevant process_instances: {potentially_relevant_instance_count}" + ) + if potentially_relevant_instance_count > 0: + run_version_1() + # this will run while using the new per instance on demand data migration framework + # run_version_2(process_instances) + + end_time = time.time() + current_app.logger.debug( + f"done running data migrations in ./bin/data_migrations/run_all.py. took {end_time - start_time} seconds" + ) + + +if __name__ == "__main__": + main() diff --git a/spiffworkflow-backend/bin/run_data_migrations_locally b/spiffworkflow-backend/bin/run_data_migrations_locally new file mode 100755 index 000000000..15fa24fe7 --- /dev/null +++ b/spiffworkflow-backend/bin/run_data_migrations_locally @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +function error_handler() { + >&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." + exit "$2" +} +trap 'error_handler ${LINENO} $?' ERR +set -o errtrace -o errexit -o nounset -o pipefail + +script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +. "${script_dir}/local_development_environment_setup" +SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP=false +poetry run python ./bin/data_migrations/run_all.py diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py new file mode 100644 index 000000000..caaefbb1c --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_migrator.py @@ -0,0 +1,53 @@ +import time +from typing import Any + +from flask import current_app +from spiffworkflow_backend.data_migrations.version_2 import Version2 +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel + + +# simple decorator to time the func +# https://stackoverflow.com/a/11151365/6090676, thank you +def benchmark_log_func(func: Any) -> Any: + """ + decorator to calculate the total time of a func + """ + + def st_func(*args: list, **kwargs: dict) -> Any: + t1 = time.time() + r = func(*args, **kwargs) + t2 = time.time() + # __qualname__, i know you use it every day. but if not, it's the function name prefixed with any qualifying class names + current_app.logger.debug(f"Function={func.__qualname__}, Time={t2 - t1}") + return r + + return st_func + + +class ProcessInstanceMigrator: + @classmethod + @benchmark_log_func + def run_version_2(cls, process_instance: ProcessInstanceModel) -> None: + if process_instance.spiff_serializer_version < Version2.VERSION: + Version2.run(process_instance) + process_instance.spiff_serializer_version = Version2.VERSION + db.session.add(process_instance) + db.session.commit() + + @classmethod + def run(cls, process_instance: ProcessInstanceModel) -> None: + """This updates the serialization of an instance to the current expected state. + + We do not run the migrator in cases where we do not expect to update the spiff internal state, + such as the process instance show page (where we do instantiate a processor). + Someday, we might need to run the migrator in more places, if using spiff to read depends on + an updated serialization. + """ + + # if the serializer version is None, then we are dealing with a new process instance, + # so we do not need to run the migrator + if process_instance.spiff_serializer_version is None: + return + + cls.run_version_2(process_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py new file mode 100644 index 000000000..f55e67228 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py @@ -0,0 +1,39 @@ +import time + +from flask import current_app +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.task_service import TaskService + + +class Version2: + VERSION = "2" + + @classmethod + def run(cls, process_instance: ProcessInstanceModel) -> None: + initial_time = time.time() + try: + processor = ProcessInstanceProcessor(process_instance) + processor.bpmn_process_instance._predict() + + spiff_tasks = processor.bpmn_process_instance.get_tasks(updated_ts=initial_time) + task_service = TaskService( + process_instance, processor._serializer, processor.bpmn_definition_to_task_definitions_mappings + ) + + # implicit begin db transaction + for spiff_task in spiff_tasks: + cls.update_spiff_task_parents(spiff_task, task_service) + + task_service.save_objects_to_database() + except Exception as ex: + current_app.logger.warning( + f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}" + ) + + @classmethod + def update_spiff_task_parents(cls, spiff_task: SpiffTask, task_service: TaskService) -> None: + task_service.update_task_model_with_spiff_task(spiff_task) + if spiff_task.parent is not None: + cls.update_spiff_task_parents(spiff_task.parent, task_service) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py index 21a94d5d5..999910d1c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py @@ -147,6 +147,7 @@ def _run_extension( processor = None try: + # this is only creates new process instances so no need to worry about process instance migrations processor = ProcessInstanceProcessor( process_instance, script_engine=CustomBpmnScriptEngine(use_restricted_script_engine=False), diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index bc5af50e6..d0674020a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -12,6 +12,7 @@ from sqlalchemy import and_ from sqlalchemy import or_ from sqlalchemy.orm import aliased +from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel @@ -158,10 +159,11 @@ def process_instance_terminate( modified_process_model_identifier: str, ) -> flask.wrappers.Response: process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - processor = ProcessInstanceProcessor(process_instance) try: with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) + processor = ProcessInstanceProcessor(process_instance) processor.terminate() except ( ProcessInstanceIsNotEnqueuedError, @@ -660,9 +662,10 @@ def send_bpmn_event( def _send_bpmn_event(process_instance: ProcessInstanceModel, body: dict) -> Response: - processor = ProcessInstanceProcessor(process_instance) try: with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) + processor = ProcessInstanceProcessor(process_instance) processor.send_bpmn_event(body) except ( ProcessInstanceIsNotEnqueuedError, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index b24264c00..9e1db21de 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -26,6 +26,7 @@ from sqlalchemy import func from sqlalchemy.orm import aliased from sqlalchemy.orm.util import AliasedClass +from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError @@ -271,8 +272,10 @@ def manual_complete_task( execute = body.get("execute", True) process_instance = ProcessInstanceModel.query.filter(ProcessInstanceModel.id == process_instance_id).first() if process_instance: - processor = ProcessInstanceProcessor(process_instance) - processor.manual_complete_task(task_guid, execute, g.user) + with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) + processor = ProcessInstanceProcessor(process_instance) + processor.manual_complete_task(task_guid, execute, g.user) else: raise ApiError( error_code="complete_task", @@ -629,6 +632,7 @@ def _dequeued_interstitial_stream( try: if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks) except ProcessInstanceIsAlreadyLockedError: yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True) @@ -771,6 +775,14 @@ def _task_submit_shared( status_code=400, ) + # we're dequeing twice in this function. + # tried to wrap the whole block in one dequeue, but that has the confusing side-effect that every exception + # in the block causes the process instance to go into an error state. for example, when + # AuthorizationService.assert_user_can_complete_task raises. this would have been solvable, but this seems simpler, + # and the cost is not huge given that this function is not the most common code path in the world. + with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) + processor = ProcessInstanceProcessor( process_instance, workflow_completed_handler=ProcessInstanceService.schedule_next_process_model_cycle ) @@ -993,10 +1005,8 @@ def _prepare_form_data( def _get_spiff_task_from_process_instance( task_guid: str, process_instance: ProcessInstanceModel, - processor: ProcessInstanceProcessor | None = None, + processor: ProcessInstanceProcessor, ) -> SpiffTask: - if processor is None: - processor = ProcessInstanceProcessor(process_instance) task_uuid = uuid.UUID(task_guid) spiff_task = processor.bpmn_process_instance.get_task_from_id(task_uuid) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index d3b5c830d..acd7691c7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -387,7 +387,7 @@ IdToBpmnProcessSpecMapping = NewType("IdToBpmnProcessSpecMapping", dict[str, Bpm class ProcessInstanceProcessor: _default_script_engine = CustomBpmnScriptEngine() - SERIALIZER_VERSION = "1.0-spiffworkflow-backend" + SERIALIZER_VERSION = "2" wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(SPIFF_SPEC_CONFIG) _serializer = BpmnWorkflowSerializer(wf_spec_converter, version=SERIALIZER_VERSION) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 7eb116244..451c41160 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -16,6 +16,7 @@ from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinitio from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.util.task import TaskState # type: ignore from spiffworkflow_backend import db +from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError @@ -71,6 +72,7 @@ class ProcessInstanceService: @staticmethod def next_start_event_configuration(process_instance_model: ProcessInstanceModel) -> StartConfiguration: try: + # this is only called from create_process_instance so no need to worry about process instance migrations processor = ProcessInstanceProcessor(process_instance_model) start_configuration = WorkflowService.next_start_event_configuration( processor.bpmn_process_instance, datetime.now(timezone.utc) @@ -259,6 +261,7 @@ class ProcessInstanceService: ) -> ProcessInstanceProcessor | None: processor = None with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceMigrator.run(process_instance) processor = ProcessInstanceProcessor( process_instance, workflow_completed_handler=cls.schedule_next_process_model_cycle )