Hotfix/user task with timer cancel (#533)

* cherry picked changes from b12af9f3bc625a6b12cfa0b8d908b378b6be9442 to pin form json files

* use the class name to determine what a task type is w/ burnettk

* initial thoughts to fix cancel timer issue w/ burnettk

* added migration to run predict on all open instances w/ burnettk

* remove debug, refactor data migrations, add benchmark_log_func

* log progress of script

* only process predicted tasks and their parents in the version 2 data miagration w/ burnettk

* added data migrator and using that to run version 2 migrations when needed w/ burnettk

* removed some unwanted code

* fix issue, but tests still need updating

* fix tests by returning code to closer to what it was

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
jasquat 2023-10-10 11:17:09 -04:00 committed by GitHub
parent dd6dcdcec9
commit ceb06cc227
10 changed files with 216 additions and 9 deletions

View File

@ -93,7 +93,7 @@ fi
# DELETE after this runs on all necessary environments # DELETE after this runs on all necessary environments
# TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations # TODO: make a system somewhat like schema migrations (storing versions in a db table) to handle data migrations
poetry run python ./bin/data_migrations/version_1_3.py poetry run python ./bin/data_migrations/run_all.py
# --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh # --worker-class is not strictly necessary, since setting threads will automatically set the worker class to gthread, but meh
export IS_GUNICORN="true" export IS_GUNICORN="true"

View File

@ -0,0 +1,85 @@
import time
from flask import current_app
from spiffworkflow_backend import create_app
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
from spiffworkflow_backend.data_migrations.version_2 import Version2
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from sqlalchemy import update
# simple decorator to time the func
# https://stackoverflow.com/a/11151365/6090676, thank you
def benchmark_log_func(func):
"""
decorator to calculate the total time of a func
"""
def st_func(*args, **kwargs):
t1 = time.time()
r = func(*args, **kwargs)
t2 = time.time()
# __qualname__, i know you use it every day. but if not, it's the function name prefixed with any qualifying class names
current_app.logger.debug(f"Function={func.__qualname__}, Time={t2 - t1}")
return r
return st_func
@benchmark_log_func
def put_serializer_version_onto_numeric_track() -> None:
old_busted_serializer_version = "1.0-spiffworkflow-backend"
update_query = (
update(ProcessInstanceModel)
.where(ProcessInstanceModel.spiff_serializer_version == old_busted_serializer_version)
.values(spiff_serializer_version="1")
)
db.session.execute(update_query)
db.session.commit()
def all_potentially_relevant_process_instances() -> list[ProcessInstanceModel]:
return ProcessInstanceModel.query.filter(
ProcessInstanceModel.spiff_serializer_version < Version2.VERSION,
ProcessInstanceModel.status.in_(ProcessInstanceModel.non_terminal_statuses()),
).all()
@benchmark_log_func
def run_version_1() -> None:
VersionOneThree().run() # make this a class method
@benchmark_log_func
def run_version_2(process_instances: list[ProcessInstanceModel]) -> None:
Version2.run(process_instances)
def main() -> None:
start_time = time.time()
app = create_app()
end_time = time.time()
with app.app_context():
current_app.logger.debug(f"data_migrations/run_all::create_app took {end_time - start_time} seconds")
start_time = time.time()
put_serializer_version_onto_numeric_track()
process_instances = all_potentially_relevant_process_instances()
potentially_relevant_instance_count = len(process_instances)
current_app.logger.debug(
f"Found potentially relevant process_instances: {potentially_relevant_instance_count}"
)
if potentially_relevant_instance_count > 0:
run_version_1()
# this will run while using the new per instance on demand data migration framework
# run_version_2(process_instances)
end_time = time.time()
current_app.logger.debug(
f"done running data migrations in ./bin/data_migrations/run_all.py. took {end_time - start_time} seconds"
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
function error_handler() {
>&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
exit "$2"
}
trap 'error_handler ${LINENO} $?' ERR
set -o errtrace -o errexit -o nounset -o pipefail
script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
. "${script_dir}/local_development_environment_setup"
SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP=false
poetry run python ./bin/data_migrations/run_all.py

View File

@ -0,0 +1,53 @@
import time
from typing import Any
from flask import current_app
from spiffworkflow_backend.data_migrations.version_2 import Version2
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
# simple decorator to time the func
# https://stackoverflow.com/a/11151365/6090676, thank you
def benchmark_log_func(func: Any) -> Any:
"""
decorator to calculate the total time of a func
"""
def st_func(*args: list, **kwargs: dict) -> Any:
t1 = time.time()
r = func(*args, **kwargs)
t2 = time.time()
# __qualname__, i know you use it every day. but if not, it's the function name prefixed with any qualifying class names
current_app.logger.debug(f"Function={func.__qualname__}, Time={t2 - t1}")
return r
return st_func
class ProcessInstanceMigrator:
@classmethod
@benchmark_log_func
def run_version_2(cls, process_instance: ProcessInstanceModel) -> None:
if process_instance.spiff_serializer_version < Version2.VERSION:
Version2.run(process_instance)
process_instance.spiff_serializer_version = Version2.VERSION
db.session.add(process_instance)
db.session.commit()
@classmethod
def run(cls, process_instance: ProcessInstanceModel) -> None:
"""This updates the serialization of an instance to the current expected state.
We do not run the migrator in cases where we do not expect to update the spiff internal state,
such as the process instance show page (where we do instantiate a processor).
Someday, we might need to run the migrator in more places, if using spiff to read depends on
an updated serialization.
"""
# if the serializer version is None, then we are dealing with a new process instance,
# so we do not need to run the migrator
if process_instance.spiff_serializer_version is None:
return
cls.run_version_2(process_instance)

View File

@ -0,0 +1,39 @@
import time
from flask import current_app
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.task_service import TaskService
class Version2:
VERSION = "2"
@classmethod
def run(cls, process_instance: ProcessInstanceModel) -> None:
initial_time = time.time()
try:
processor = ProcessInstanceProcessor(process_instance)
processor.bpmn_process_instance._predict()
spiff_tasks = processor.bpmn_process_instance.get_tasks(updated_ts=initial_time)
task_service = TaskService(
process_instance, processor._serializer, processor.bpmn_definition_to_task_definitions_mappings
)
# implicit begin db transaction
for spiff_task in spiff_tasks:
cls.update_spiff_task_parents(spiff_task, task_service)
task_service.save_objects_to_database()
except Exception as ex:
current_app.logger.warning(
f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}"
)
@classmethod
def update_spiff_task_parents(cls, spiff_task: SpiffTask, task_service: TaskService) -> None:
task_service.update_task_model_with_spiff_task(spiff_task)
if spiff_task.parent is not None:
cls.update_spiff_task_parents(spiff_task.parent, task_service)

View File

@ -147,6 +147,7 @@ def _run_extension(
processor = None processor = None
try: try:
# this is only creates new process instances so no need to worry about process instance migrations
processor = ProcessInstanceProcessor( processor = ProcessInstanceProcessor(
process_instance, process_instance,
script_engine=CustomBpmnScriptEngine(use_restricted_script_engine=False), script_engine=CustomBpmnScriptEngine(use_restricted_script_engine=False),

View File

@ -12,6 +12,7 @@ from sqlalchemy import and_
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
@ -158,10 +159,11 @@ def process_instance_terminate(
modified_process_model_identifier: str, modified_process_model_identifier: str,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
processor = ProcessInstanceProcessor(process_instance)
try: try:
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor(process_instance)
processor.terminate() processor.terminate()
except ( except (
ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsNotEnqueuedError,
@ -660,9 +662,10 @@ def send_bpmn_event(
def _send_bpmn_event(process_instance: ProcessInstanceModel, body: dict) -> Response: def _send_bpmn_event(process_instance: ProcessInstanceModel, body: dict) -> Response:
processor = ProcessInstanceProcessor(process_instance)
try: try:
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor(process_instance)
processor.send_bpmn_event(body) processor.send_bpmn_event(body)
except ( except (
ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsNotEnqueuedError,

View File

@ -26,6 +26,7 @@ from sqlalchemy import func
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from sqlalchemy.orm.util import AliasedClass from sqlalchemy.orm.util import AliasedClass
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError
from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError
@ -271,6 +272,8 @@ def manual_complete_task(
execute = body.get("execute", True) execute = body.get("execute", True)
process_instance = ProcessInstanceModel.query.filter(ProcessInstanceModel.id == process_instance_id).first() process_instance = ProcessInstanceModel.query.filter(ProcessInstanceModel.id == process_instance_id).first()
if process_instance: if process_instance:
with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.manual_complete_task(task_guid, execute, g.user) processor.manual_complete_task(task_guid, execute, g.user)
else: else:
@ -629,6 +632,7 @@ def _dequeued_interstitial_stream(
try: try:
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks) yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)
except ProcessInstanceIsAlreadyLockedError: except ProcessInstanceIsAlreadyLockedError:
yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True) yield from _interstitial_stream(process_instance, execute_tasks=False, is_locked=True)
@ -771,6 +775,14 @@ def _task_submit_shared(
status_code=400, status_code=400,
) )
# we're dequeing twice in this function.
# tried to wrap the whole block in one dequeue, but that has the confusing side-effect that every exception
# in the block causes the process instance to go into an error state. for example, when
# AuthorizationService.assert_user_can_complete_task raises. this would have been solvable, but this seems simpler,
# and the cost is not huge given that this function is not the most common code path in the world.
with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor( processor = ProcessInstanceProcessor(
process_instance, workflow_completed_handler=ProcessInstanceService.schedule_next_process_model_cycle process_instance, workflow_completed_handler=ProcessInstanceService.schedule_next_process_model_cycle
) )
@ -993,10 +1005,8 @@ def _prepare_form_data(
def _get_spiff_task_from_process_instance( def _get_spiff_task_from_process_instance(
task_guid: str, task_guid: str,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
processor: ProcessInstanceProcessor | None = None, processor: ProcessInstanceProcessor,
) -> SpiffTask: ) -> SpiffTask:
if processor is None:
processor = ProcessInstanceProcessor(process_instance)
task_uuid = uuid.UUID(task_guid) task_uuid = uuid.UUID(task_guid)
spiff_task = processor.bpmn_process_instance.get_task_from_id(task_uuid) spiff_task = processor.bpmn_process_instance.get_task_from_id(task_uuid)

View File

@ -387,7 +387,7 @@ IdToBpmnProcessSpecMapping = NewType("IdToBpmnProcessSpecMapping", dict[str, Bpm
class ProcessInstanceProcessor: class ProcessInstanceProcessor:
_default_script_engine = CustomBpmnScriptEngine() _default_script_engine = CustomBpmnScriptEngine()
SERIALIZER_VERSION = "1.0-spiffworkflow-backend" SERIALIZER_VERSION = "2"
wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(SPIFF_SPEC_CONFIG) wf_spec_converter = BpmnWorkflowSerializer.configure_workflow_spec_converter(SPIFF_SPEC_CONFIG)
_serializer = BpmnWorkflowSerializer(wf_spec_converter, version=SERIALIZER_VERSION) _serializer = BpmnWorkflowSerializer(wf_spec_converter, version=SERIALIZER_VERSION)

View File

@ -16,6 +16,7 @@ from SpiffWorkflow.bpmn.specs.event_definitions.timer import TimerEventDefinitio
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.task import TaskState # type: ignore from SpiffWorkflow.util.task import TaskState # type: ignore
from spiffworkflow_backend import db from spiffworkflow_backend import db
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError
from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError from spiffworkflow_backend.exceptions.error import HumanTaskNotFoundError
@ -71,6 +72,7 @@ class ProcessInstanceService:
@staticmethod @staticmethod
def next_start_event_configuration(process_instance_model: ProcessInstanceModel) -> StartConfiguration: def next_start_event_configuration(process_instance_model: ProcessInstanceModel) -> StartConfiguration:
try: try:
# this is only called from create_process_instance so no need to worry about process instance migrations
processor = ProcessInstanceProcessor(process_instance_model) processor = ProcessInstanceProcessor(process_instance_model)
start_configuration = WorkflowService.next_start_event_configuration( start_configuration = WorkflowService.next_start_event_configuration(
processor.bpmn_process_instance, datetime.now(timezone.utc) processor.bpmn_process_instance, datetime.now(timezone.utc)
@ -259,6 +261,7 @@ class ProcessInstanceService:
) -> ProcessInstanceProcessor | None: ) -> ProcessInstanceProcessor | None:
processor = None processor = None
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor( processor = ProcessInstanceProcessor(
process_instance, workflow_completed_handler=cls.schedule_next_process_model_cycle process_instance, workflow_completed_handler=cls.schedule_next_process_model_cycle
) )