From 6643db9f2f1a7b3186cc401c5950c8b4ac75c744 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Thu, 18 Apr 2024 15:24:59 +0000 Subject: [PATCH] Pi file data on fs (#1403) * added basic support to store process instance file data on the file system w/ burnettk * the files need to be in a hashed directory structure w/ burnettk * fixed exception message w/ burnettk * use the same entry method to the contents of data file w/ burnettk * fixed hash method w/ burnettk * make hash method easier to read w/ burnettk * added a migrator and test for migration w/ burnettk * fix a couple things while testing --------- Co-authored-by: jasquat Co-authored-by: burnettk --- .../bin/data_migrations/run_all.py | 3 + .../spiffworkflow_backend/config/__init__.py | 7 ++ .../spiffworkflow_backend/config/default.py | 5 ++ .../process_instance_file_data_migrator.py | 16 ++++ .../models/process_instance_file_data.py | 44 +++++++++- .../routes/process_api_blueprint.py | 2 +- .../scripts/get_encoded_file_data.py | 2 +- .../services/process_instance_service.py | 4 +- .../integration/test_process_api.py | 85 +++++++++++++++++++ .../unit/test_process_instance_file_data.py | 18 ++++ ...est_process_instance_file_data_migrator.py | 64 ++++++++++++++ .../unit/test_process_instance_service.py | 2 +- 12 files changed, 247 insertions(+), 5 deletions(-) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_file_data_migrator.py create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data.py create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data_migrator.py diff --git a/spiffworkflow-backend/bin/data_migrations/run_all.py b/spiffworkflow-backend/bin/data_migrations/run_all.py index 0e888b9a2..61050b408 100644 --- a/spiffworkflow-backend/bin/data_migrations/run_all.py +++ b/spiffworkflow-backend/bin/data_migrations/run_all.py @@ -2,6 +2,7 @@ import time from flask import current_app from spiffworkflow_backend import create_app +from spiffworkflow_backend.data_migrations.process_instance_file_data_migrator import ProcessInstanceFileDataMigrator from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree from spiffworkflow_backend.data_migrations.version_2 import Version2 from spiffworkflow_backend.models.db import db @@ -105,6 +106,8 @@ def main() -> None: run_version_1() # this will run while using the new per instance on demand data migration framework # run_version_2(process_instances) + if app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None: + ProcessInstanceFileDataMigrator.migrate_from_database_to_filesystem() end_time = time.time() current_app.logger.debug( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index 7656fb5a4..f6c3adb19 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -248,6 +248,13 @@ def setup_config(app: Flask) -> None: app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"] = "greedy" app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"] = "run_until_user_message" + if app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None: + if not os.path.isdir(app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"]): + raise ConfigurationError( + "Could not find the directory specified with SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH: " + f"{app.config['SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH']}" + ) + thread_local_data = threading.local() app.config["THREAD_LOCAL_DATA"] = thread_local_data _set_up_tenant_specific_fields_as_list_of_strings(app) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 6cb121b16..b41a93c95 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -191,6 +191,11 @@ config_from_env( ) config_from_env("SPIFFWORKFLOW_BACKEND_ENCRYPTION_KEY") + +### process instance file data +# if set then it will save files associated with process instances to this location +config_from_env("SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH") + ### locking # timeouts for process instances locks as they are run to avoid stale locks config_from_env("SPIFFWORKFLOW_BACKEND_ALLOW_CONFISCATING_LOCK_AFTER_SECONDS", default="600") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_file_data_migrator.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_file_data_migrator.py new file mode 100644 index 000000000..d48a658bb --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/process_instance_file_data_migrator.py @@ -0,0 +1,16 @@ +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.process_instance_file_data import PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM +from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel + + +class ProcessInstanceFileDataMigrator: + @classmethod + def migrate_from_database_to_filesystem(cls) -> None: + file_data = ProcessInstanceFileDataModel.query.filter( + ProcessInstanceFileDataModel.contents != PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode() + ).all() + + for file in file_data: + file.store_file_on_file_system() + db.session.add(file) + db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_file_data.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_file_data.py index 5a6fded16..5618e3857 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_file_data.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_file_data.py @@ -1,5 +1,7 @@ +import os from dataclasses import dataclass +from flask import current_app from sqlalchemy import ForeignKey from sqlalchemy.dialects.mysql import LONGBLOB @@ -7,6 +9,9 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM = "contents_in:filesystem" +PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT = 2 + @dataclass class ProcessInstanceFileDataModel(SpiffworkflowBaseDBModel): @@ -17,7 +22,44 @@ class ProcessInstanceFileDataModel(SpiffworkflowBaseDBModel): mimetype: str = db.Column(db.String(255), nullable=False) filename: str = db.Column(db.String(255), nullable=False) # this is not deferred because there is no reason to query this model if you do not want the contents - contents: str = db.Column(db.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=False) + contents: bytes = db.Column(db.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=False) digest: str = db.Column(db.String(64), nullable=False, index=True) updated_at_in_seconds: int = db.Column(db.Integer, nullable=False) created_at_in_seconds: int = db.Column(db.Integer, nullable=False) + + def get_contents(self) -> bytes: + file_contents = self.contents + if current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None: + file_contents = self.get_contents_on_file_system() + return file_contents + + def store_file_on_file_system(self) -> None: + filepath = self.get_full_filepath() + try: + os.makedirs(os.path.dirname(filepath)) + except FileExistsError: + pass + + with open(filepath, "wb") as f: + f.write(self.contents) + self.contents = PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode() + + def get_contents_on_file_system(self) -> bytes: + filepath = self.get_full_filepath() + with open(filepath) as f: + return f.read().encode() + + def get_full_filepath(self) -> str: + dir_parts = self.__class__.get_hashed_directory_structure(self.digest) + return os.path.join( + current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"], *dir_parts, self.digest + ) + + @classmethod + def get_hashed_directory_structure(cls, digest: str) -> list[str]: + dir_parts = [] + for ii in range(PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT): + start_index = ii * PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT + end_index = start_index + PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT + dir_parts.append(digest[start_index:end_index]) + return dir_parts diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py index f9db9bb51..b7fa53198 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -237,7 +237,7 @@ def process_data_file_download( ) mimetype = file_data.mimetype filename = file_data.filename - file_contents = file_data.contents + file_contents = file_data.get_contents() return Response( file_contents, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_encoded_file_data.py b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_encoded_file_data.py index 72aa103fd..d84327cbd 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_encoded_file_data.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_encoded_file_data.py @@ -32,7 +32,7 @@ class GetEncodedFileData(Script): process_instance_id=process_instance_id, ).first() - base64_value = base64.b64encode(file_data.contents).decode("ascii") + base64_value = base64.b64encode(file_data.get_contents()).decode("ascii") encoded_file_data = f"data:{file_data.mimetype};name={file_data.filename};base64,{base64_value}" return encoded_file_data diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 3ea5deebb..e4abe2979 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -388,7 +388,7 @@ class ProcessInstanceService: process_instance_id=process_instance_id, mimetype=mimetype, filename=filename, - contents=contents, # type: ignore + contents=contents, digest=digest, updated_at_in_seconds=now_in_seconds, created_at_in_seconds=now_in_seconds, @@ -443,6 +443,8 @@ class ProcessInstanceService: models = cls.replace_file_data_with_digest_references(data, process_instance_id) for model in models: + if current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None: + model.store_file_on_file_system() db.session.add(model) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 84e4a657b..510a387a1 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -17,6 +17,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType +from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel from spiffworkflow_backend.models.process_instance_report import ReportMetadata @@ -1552,6 +1553,90 @@ class TestProcessApi(BaseTest): assert response.status_code == 200 assert response.data == expected_content + def test_can_download_uploaded_file_from_file_system( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + # it's just for convenience, since the root_path gets deleted after every test + with self.app_config_mock( + app, "SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH", ProcessModelService.root_path() + ): + process_group_id = "test_message_send" + process_model_id = "message_sender" + bpmn_file_name = "message_sender.bpmn" + bpmn_file_location = "message_send_one_conversation" + process_model = self.create_group_and_model_with_bpmn( + client, + with_super_admin_user, + process_group_id=process_group_id, + process_model_id=process_model_id, + bpmn_file_name=bpmn_file_name, + bpmn_file_location=bpmn_file_location, + ) + + def _file_data(i: int, c: bytes) -> str: + b64 = base64.b64encode(c).decode() + return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}" + + def _digest_reference(i: int, sha: str) -> str: + b64 = f"{ProcessInstanceService.FILE_DATA_DIGEST_PREFIX}{sha}" + return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}" + + file_contents = [f"contents{i}".encode() for i in range(3)] + file_data = [_file_data(i, c) for i, c in enumerate(file_contents)] + digests = [sha256(c).hexdigest() for c in file_contents] + [_digest_reference(i, d) for i, d in enumerate(digests)] + + payload = { + "customer_id": "sartography", + "po_number": "1001", + "amount": "One Billion Dollars! Mwhahahahahaha", + "description": "But seriously.", + "file0": file_data[0], + "key": [{"file1": file_data[1]}], + "key2": {"key3": [{"key4": file_data[2], "key5": "bob"}]}, + } + + response = self.create_process_instance_from_process_model_id_with_api( + client, + process_model.id, + self.logged_in_headers(with_super_admin_user), + ) + assert response.json is not None + process_instance_id = response.json["id"] + + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + task = processor.get_all_user_tasks()[0] + human_task = process_instance.active_human_tasks[0] + + ProcessInstanceService.complete_form_task( + processor, + task, + payload, + with_super_admin_user, + human_task, + ) + processor.save() + + for expected_content, digest in zip(file_contents, digests, strict=True): + response = client.get( + f"/v1.0/process-data-file-download/{self.modify_process_identifier_for_path_param(process_model.id)}/{process_instance_id}/{digest}", + headers=self.logged_in_headers(with_super_admin_user), + ) + assert response.status_code == 200 + assert response.data == expected_content + + dir_parts = ProcessInstanceFileDataModel.get_hashed_directory_structure(digest) + filepath = os.path.join( + app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"], *dir_parts, digest + ) + assert os.path.isfile(filepath) + def test_process_instance_can_be_terminated( self, app: Flask, diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data.py new file mode 100644 index 000000000..c8d0499de --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data.py @@ -0,0 +1,18 @@ +import hashlib + +from flask.app import Flask +from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel + +from tests.spiffworkflow_backend.helpers.base_test import BaseTest + + +class TestProcessInstanceFileData(BaseTest): + def test_returns_correct_dir_structure_from_digest( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + digest = hashlib.sha256(b"OH YEAH").hexdigest() + digest_parts = ProcessInstanceFileDataModel.get_hashed_directory_structure(digest) + assert digest == "b65b894bb56d8cf56e1045bbac80ea1d313640f7ee3ee724f43b2a07be5bff5f" + assert digest_parts == ["b6", "5b"] diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data_migrator.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data_migrator.py new file mode 100644 index 000000000..1c1b97956 --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_file_data_migrator.py @@ -0,0 +1,64 @@ +import hashlib +import json + +from flask.app import Flask +from flask.testing import FlaskClient +from spiffworkflow_backend.data_migrations.process_instance_file_data_migrator import ProcessInstanceFileDataMigrator +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_file_data import PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM +from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel +from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.process_model_service import ProcessModelService + +from tests.spiffworkflow_backend.helpers.base_test import BaseTest +from tests.spiffworkflow_backend.helpers.test_data import load_test_spec + + +class TestProcessInstanceFileDataMigrator(BaseTest): + def test_can_migrate_from_db_to_fs( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_model = load_test_spec( + process_model_id="test_group/random_fact", + bpmn_file_name="random_fact_set.bpmn", + process_model_source_directory="random_fact", + ) + process_instance = self.create_process_instance_from_process_model(process_model=process_model) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert process_instance.status == ProcessInstanceStatus.complete.value + + test_file_one_contents = json.dumps({"hello": "hey"}) + test_file_one_digest = hashlib.sha256(test_file_one_contents.encode()).hexdigest() + pi_files = [ + { + "mimetype": "application/json", + "filename": "test_file_one.json", + "contents": test_file_one_contents, + "digest": test_file_one_digest, + } + ] + for pi_file in pi_files: + pi_model = ProcessInstanceFileDataModel( + process_instance_id=process_instance.id, + mimetype=pi_file["mimetype"], + filename=pi_file["filename"], + contents=pi_file["contents"].encode(), + digest=pi_file["digest"], + ) + db.session.add(pi_model) + db.session.commit() + + with self.app_config_mock( + app, "SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH", ProcessModelService.root_path() + ): + ProcessInstanceFileDataMigrator.migrate_from_database_to_filesystem() + + test_file_one_model = ProcessInstanceFileDataModel.query.filter_by(filename="test_file_one.json").first() + assert test_file_one_model is not None + assert test_file_one_model.contents == PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode() + assert test_file_one_model.get_contents() == test_file_one_contents.encode() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py index a314b68c6..1fe4b6bb4 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_service.py @@ -98,7 +98,7 @@ class TestProcessInstanceService(BaseTest): assert model.process_instance_id == process_instance_id assert model.mimetype == "some/mimetype" assert model.filename == f"testing{i}.txt" - assert model.contents == _file_content(i) # type: ignore + assert model.contents == _file_content(i) assert model.digest == _digest(i) def test_does_not_skip_events_it_does_not_know_about(self) -> None: