From bb99d9429072bd9866d6809e16d5feb5d19a335a Mon Sep 17 00:00:00 2001 From: jasquat Date: Mon, 23 Jan 2023 16:45:07 -0500 Subject: [PATCH] added locking system for process instances so hopefully background jobs will not take instances currently being run by the user w/ burnettk --- .flake8 | 7 ++- bin/save_to_secrets_from_file | 33 ++++++++++ .../{22212a7d6505_.py => 49aae41d7992_.py} | 8 ++- src/spiffworkflow_backend/config/__init__.py | 3 + src/spiffworkflow_backend/config/default.py | 4 ++ .../models/process_instance.py | 3 + .../routes/process_instances_controller.py | 3 + .../routes/tasks_controller.py | 2 + .../services/process_instance_processor.py | 60 ++++++++++++++++++- .../services/process_instance_service.py | 14 +++++ .../unit/test_process_instance_processor.py | 47 +++++++++++++++ 11 files changed, 178 insertions(+), 6 deletions(-) create mode 100644 bin/save_to_secrets_from_file rename migrations/versions/{22212a7d6505_.py => 49aae41d7992_.py} (98%) diff --git a/.flake8 b/.flake8 index 9c3596e8..b42cf528 100644 --- a/.flake8 +++ b/.flake8 @@ -8,11 +8,14 @@ rst-roles = class,const,func,meth,mod,ref rst-directives = deprecated per-file-ignores = + # More specific globs seem to overwrite the more generic ones so we have + # to split them out by directory + # asserts are ok in tests - tests/*:S101 + tests/*:S101,D102,D103 # prefer naming functions descriptively rather than forcing comments - *:D102 + src/*:D102,D103 bin/keycloak_test_server.py:B950,D conftest.py:S105 diff --git a/bin/save_to_secrets_from_file b/bin/save_to_secrets_from_file new file mode 100644 index 00000000..39f71602 --- /dev/null +++ b/bin/save_to_secrets_from_file @@ -0,0 +1,33 @@ +"""Get the bpmn process json for a given process instance id and store it in /tmp.""" +import os +import sys + +from spiffworkflow_backend import create_app +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.user import UserModel +from spiffworkflow_backend.services.secret_service import SecretService + + +def main(env_file: str): + """Main.""" + os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "development" + if os.environ.get("BPMN_SPEC_ABSOLUTE_DIR") is None: + os.environ["BPMN_SPEC_ABSOLUTE_DIR"] = "hey" + flask_env_key = "FLASK_SESSION_SECRET_KEY" + os.environ[flask_env_key] = "whatevs" + app = create_app() + with app.app_context(): + contents = None + with open(env_file, 'r') as f: + contents = f.readlines() + for line in contents: + key, value_raw = line.split('=') + value = value_raw.replace('"', '') + SecretService().add_secret(key, value, UserModel.query.first().id) + + + +if len(sys.argv) < 2: + raise Exception("env file must be specified") + +main(sys.argv[1]) diff --git a/migrations/versions/22212a7d6505_.py b/migrations/versions/49aae41d7992_.py similarity index 98% rename from migrations/versions/22212a7d6505_.py rename to migrations/versions/49aae41d7992_.py index f662f583..52b352a2 100644 --- a/migrations/versions/22212a7d6505_.py +++ b/migrations/versions/49aae41d7992_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 22212a7d6505 +Revision ID: 49aae41d7992 Revises: -Create Date: 2023-01-23 10:59:17.365694 +Create Date: 2023-01-23 14:23:17.989042 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '22212a7d6505' +revision = '49aae41d7992' down_revision = None branch_labels = None depends_on = None @@ -129,6 +129,8 @@ def upgrade(): sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True), sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True), sa.Column('spiff_step', sa.Integer(), nullable=True), + sa.Column('locked_by', sa.String(length=80), nullable=True), + sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ), sa.PrimaryKeyConstraint('id') ) diff --git a/src/spiffworkflow_backend/config/__init__.py b/src/spiffworkflow_backend/config/__init__.py index 759ac339..f9f19571 100644 --- a/src/spiffworkflow_backend/config/__init__.py +++ b/src/spiffworkflow_backend/config/__init__.py @@ -1,6 +1,7 @@ """__init__.py.""" import os import threading +import uuid from flask.app import Flask from werkzeug.utils import ImportStringError @@ -96,6 +97,8 @@ def setup_config(app: Flask) -> None: if app.config["BPMN_SPEC_ABSOLUTE_DIR"] is None: raise ConfigurationError("BPMN_SPEC_ABSOLUTE_DIR config must be set") + app.config["PROCESS_UUID"] = uuid.uuid4() + setup_database_uri(app) setup_logger(app) diff --git a/src/spiffworkflow_backend/config/default.py b/src/spiffworkflow_backend/config/default.py index db0baeba..252b2b89 100644 --- a/src/spiffworkflow_backend/config/default.py +++ b/src/spiffworkflow_backend/config/default.py @@ -82,3 +82,7 @@ SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID = environ.get( "SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID", default="Message_SystemMessageNotification", ) + +ALLOW_CONFISCATING_LOCK_AFTER_SECONDS = int( + environ.get("ALLOW_CONFISCATING_LOCK_AFTER_SECONDS", default="600") +) diff --git a/src/spiffworkflow_backend/models/process_instance.py b/src/spiffworkflow_backend/models/process_instance.py index 75a58004..9e075600 100644 --- a/src/spiffworkflow_backend/models/process_instance.py +++ b/src/spiffworkflow_backend/models/process_instance.py @@ -88,6 +88,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): bpmn_version_control_identifier: str = db.Column(db.String(255)) spiff_step: int = db.Column(db.Integer) + locked_by: str | None = db.Column(db.String(80)) + locked_at_in_seconds: int | None = db.Column(db.Integer) + @property def serialized(self) -> dict[str, Any]: """Return object data in serializeable format.""" diff --git a/src/spiffworkflow_backend/routes/process_instances_controller.py b/src/spiffworkflow_backend/routes/process_instances_controller.py index 065150aa..1271f929 100644 --- a/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -102,6 +102,7 @@ def process_instance_run( ) processor = ProcessInstanceProcessor(process_instance) + processor.lock_process_instance("Web") if do_engine_steps: try: @@ -118,6 +119,8 @@ def process_instance_run( status_code=400, task=task, ) from e + finally: + processor.unlock_process_instance("Web") if not current_app.config["RUN_BACKGROUND_SCHEDULER"]: MessageService.process_message_instances() diff --git a/src/spiffworkflow_backend/routes/tasks_controller.py b/src/spiffworkflow_backend/routes/tasks_controller.py index 3126674e..c206e4da 100644 --- a/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/src/spiffworkflow_backend/routes/tasks_controller.py @@ -371,6 +371,7 @@ def task_submit( ) ) + processor.lock_process_instance("Web") ProcessInstanceService.complete_form_task( processor=processor, spiff_task=spiff_task, @@ -378,6 +379,7 @@ def task_submit( user=g.user, human_task=human_task, ) + processor.unlock_process_instance("Web") # If we need to update all tasks, then get the next ready task and if it a multi-instance with the same # task spec, complete that form as well. diff --git a/src/spiffworkflow_backend/services/process_instance_processor.py b/src/spiffworkflow_backend/services/process_instance_processor.py index 8ca1e36f..0889c06e 100644 --- a/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/src/spiffworkflow_backend/services/process_instance_processor.py @@ -69,6 +69,7 @@ from SpiffWorkflow.spiff.serializer.task_spec_converters import UserTaskConverte from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore +from sqlalchemy import text from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.db import db @@ -141,6 +142,14 @@ class MissingProcessInfoError(Exception): """MissingProcessInfoError.""" +class ProcessInstanceIsAlreadyLockedError(Exception): + pass + + +class ProcessInstanceLockedBySomethingElseError(Exception): + pass + + class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore """This is a custom script processor that can be easily injected into Spiff Workflow. @@ -761,6 +770,10 @@ class ProcessInstanceProcessor: complete_states = [TaskState.CANCELLED, TaskState.COMPLETED] user_tasks = list(self.get_all_user_tasks()) self.process_instance_model.status = self.get_status().value + current_app.logger.debug( + f"the_status: {self.process_instance_model.status} for instance" + f" {self.process_instance_model.id}" + ) self.process_instance_model.total_tasks = len(user_tasks) self.process_instance_model.completed_tasks = sum( 1 for t in user_tasks if t.state in complete_states @@ -1142,7 +1155,52 @@ class ProcessInstanceProcessor: def get_status(self) -> ProcessInstanceStatus: """Get_status.""" - return self.status_of(self.bpmn_process_instance) + the_status = self.status_of(self.bpmn_process_instance) + # current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}") + return the_status + + def lock_process_instance(self, lock_prefix: str) -> None: + locked_by = f"{lock_prefix}_{current_app.config['PROCESS_UUID']}" + current_time_in_seconds = round(time.time()) + lock_expiry_in_seconds = ( + current_time_in_seconds + - current_app.config["ALLOW_CONFISCATING_LOCK_AFTER_SECONDS"] + ) + + query_text = text( + "UPDATE process_instance SET locked_at_in_seconds =" + " :current_time_in_seconds, locked_by = :locked_by where id = :id AND" + " (locked_by IS NULL OR locked_at_in_seconds < :lock_expiry_in_seconds);" + ).execution_options(autocommit=True) + result = db.engine.execute( + query_text, + id=self.process_instance_model.id, + current_time_in_seconds=current_time_in_seconds, + locked_by=locked_by, + lock_expiry_in_seconds=lock_expiry_in_seconds, + ) + # it seems like autocommit is working above (we see the statement in debug logs) but sqlalchemy doesn't + # seem to update properly so tell it to commit as well. + # if we omit this line then querying the record from a unit test doesn't ever show the record as locked. + db.session.commit() + if result.rowcount < 1: + raise ProcessInstanceIsAlreadyLockedError( + f"Cannot lock process instance {self.process_instance_model.id}." + "It has already been locked." + ) + + def unlock_process_instance(self, lock_prefix: str) -> None: + locked_by = f"{lock_prefix}_{current_app.config['PROCESS_UUID']}" + if self.process_instance_model.locked_by != locked_by: + raise ProcessInstanceLockedBySomethingElseError( + f"Cannot unlock process instance {self.process_instance_model.id}." + f"It locked by {self.process_instance_model.locked_by}" + ) + + self.process_instance_model.locked_by = None + self.process_instance_model.locked_at_in_seconds = None + db.session.add(self.process_instance_model) + db.session.commit() # messages have one correlation key (possibly wrong) # correlation keys may have many correlation properties diff --git a/src/spiffworkflow_backend/services/process_instance_service.py b/src/spiffworkflow_backend/services/process_instance_service.py index cf86def9..d3cb30a9 100644 --- a/src/spiffworkflow_backend/services/process_instance_service.py +++ b/src/spiffworkflow_backend/services/process_instance_service.py @@ -20,6 +20,9 @@ from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitService +from spiffworkflow_backend.services.process_instance_processor import ( + ProcessInstanceIsAlreadyLockedError, +) from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) @@ -74,12 +77,18 @@ class ProcessInstanceService: .all() ) for process_instance in records: + locked = False + processor = None try: current_app.logger.info( f"Processing process_instance {process_instance.id}" ) processor = ProcessInstanceProcessor(process_instance) + processor.lock_process_instance("Web") + locked = True processor.do_engine_steps(save=True) + except ProcessInstanceIsAlreadyLockedError: + continue except Exception as e: db.session.rollback() # in case the above left the database with a bad transaction process_instance.status = ProcessInstanceStatus.error.value @@ -91,6 +100,9 @@ class ProcessInstanceService: + f"({process_instance.process_model_identifier}). {str(e)}" ) current_app.logger.error(error_message) + finally: + if locked and processor: + processor.unlock_process_instance("Web") @staticmethod def processor_to_process_instance_api( @@ -220,6 +232,8 @@ class ProcessInstanceService: spiff_task.update_data(dot_dct) # ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store. processor.complete_task(spiff_task, human_task, user=user) + + # maybe move this out once we have the interstitial page since this is here just so we can get the next human task processor.do_engine_steps(save=True) @staticmethod diff --git a/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index b4a650dc..690d6ca5 100644 --- a/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -7,12 +7,19 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from spiffworkflow_backend.models.group import GroupModel +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import ( UserDoesNotHaveAccessToTaskError, ) +from spiffworkflow_backend.services.process_instance_processor import ( + ProcessInstanceIsAlreadyLockedError, +) +from spiffworkflow_backend.services.process_instance_processor import ( + ProcessInstanceLockedBySomethingElseError, +) from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) @@ -293,3 +300,43 @@ class TestProcessInstanceProcessor(BaseTest): assert len(process_instance.active_human_tasks) == 1 assert initial_human_task_id == process_instance.active_human_tasks[0].id + + def test_it_can_lock_and_unlock_a_process_instance( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + initiator_user = self.find_or_create_user("initiator_user") + process_model = load_test_spec( + process_model_id="test_group/model_with_lanes", + bpmn_file_name="lanes_with_owner_dict.bpmn", + process_model_source_directory="model_with_lanes", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + assert process_instance.locked_by is None + assert process_instance.locked_at_in_seconds is None + processor.lock_process_instance("TEST") + + process_instance = ProcessInstanceModel.query.filter_by( + id=process_instance.id + ).first() + assert process_instance.locked_by is not None + assert process_instance.locked_at_in_seconds is not None + + with pytest.raises(ProcessInstanceIsAlreadyLockedError): + processor.lock_process_instance("TEST") + + with pytest.raises(ProcessInstanceLockedBySomethingElseError): + processor.unlock_process_instance("TEST2") + + processor.unlock_process_instance("TEST") + + process_instance = ProcessInstanceModel.query.filter_by( + id=process_instance.id + ).first() + assert process_instance.locked_by is None + assert process_instance.locked_at_in_seconds is None