Pi file data on fs (#1403)
* added basic support to store process instance file data on the file system w/ burnettk * the files need to be in a hashed directory structure w/ burnettk * fixed exception message w/ burnettk * use the same entry method to the contents of data file w/ burnettk * fixed hash method w/ burnettk * make hash method easier to read w/ burnettk * added a migrator and test for migration w/ burnettk * fix a couple things while testing --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com> Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
parent
937f4a7e27
commit
6643db9f2f
|
@ -2,6 +2,7 @@ import time
|
|||
|
||||
from flask import current_app
|
||||
from spiffworkflow_backend import create_app
|
||||
from spiffworkflow_backend.data_migrations.process_instance_file_data_migrator import ProcessInstanceFileDataMigrator
|
||||
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
|
||||
from spiffworkflow_backend.data_migrations.version_2 import Version2
|
||||
from spiffworkflow_backend.models.db import db
|
||||
|
@ -105,6 +106,8 @@ def main() -> None:
|
|||
run_version_1()
|
||||
# this will run while using the new per instance on demand data migration framework
|
||||
# run_version_2(process_instances)
|
||||
if app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None:
|
||||
ProcessInstanceFileDataMigrator.migrate_from_database_to_filesystem()
|
||||
|
||||
end_time = time.time()
|
||||
current_app.logger.debug(
|
||||
|
|
|
@ -248,6 +248,13 @@ def setup_config(app: Flask) -> None:
|
|||
app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"] = "greedy"
|
||||
app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"] = "run_until_user_message"
|
||||
|
||||
if app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None:
|
||||
if not os.path.isdir(app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"]):
|
||||
raise ConfigurationError(
|
||||
"Could not find the directory specified with SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH: "
|
||||
f"{app.config['SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH']}"
|
||||
)
|
||||
|
||||
thread_local_data = threading.local()
|
||||
app.config["THREAD_LOCAL_DATA"] = thread_local_data
|
||||
_set_up_tenant_specific_fields_as_list_of_strings(app)
|
||||
|
|
|
@ -191,6 +191,11 @@ config_from_env(
|
|||
)
|
||||
config_from_env("SPIFFWORKFLOW_BACKEND_ENCRYPTION_KEY")
|
||||
|
||||
|
||||
### process instance file data
|
||||
# if set then it will save files associated with process instances to this location
|
||||
config_from_env("SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH")
|
||||
|
||||
### locking
|
||||
# timeouts for process instances locks as they are run to avoid stale locks
|
||||
config_from_env("SPIFFWORKFLOW_BACKEND_ALLOW_CONFISCATING_LOCK_AFTER_SECONDS", default="600")
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.process_instance_file_data import PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
|
||||
|
||||
class ProcessInstanceFileDataMigrator:
|
||||
@classmethod
|
||||
def migrate_from_database_to_filesystem(cls) -> None:
|
||||
file_data = ProcessInstanceFileDataModel.query.filter(
|
||||
ProcessInstanceFileDataModel.contents != PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode()
|
||||
).all()
|
||||
|
||||
for file in file_data:
|
||||
file.store_file_on_file_system()
|
||||
db.session.add(file)
|
||||
db.session.commit()
|
|
@ -1,5 +1,7 @@
|
|||
import os
|
||||
from dataclasses import dataclass
|
||||
|
||||
from flask import current_app
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.dialects.mysql import LONGBLOB
|
||||
|
||||
|
@ -7,6 +9,9 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
|||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
|
||||
PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM = "contents_in:filesystem"
|
||||
PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessInstanceFileDataModel(SpiffworkflowBaseDBModel):
|
||||
|
@ -17,7 +22,44 @@ class ProcessInstanceFileDataModel(SpiffworkflowBaseDBModel):
|
|||
mimetype: str = db.Column(db.String(255), nullable=False)
|
||||
filename: str = db.Column(db.String(255), nullable=False)
|
||||
# this is not deferred because there is no reason to query this model if you do not want the contents
|
||||
contents: str = db.Column(db.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=False)
|
||||
contents: bytes = db.Column(db.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=False)
|
||||
digest: str = db.Column(db.String(64), nullable=False, index=True)
|
||||
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||
created_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||
|
||||
def get_contents(self) -> bytes:
|
||||
file_contents = self.contents
|
||||
if current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None:
|
||||
file_contents = self.get_contents_on_file_system()
|
||||
return file_contents
|
||||
|
||||
def store_file_on_file_system(self) -> None:
|
||||
filepath = self.get_full_filepath()
|
||||
try:
|
||||
os.makedirs(os.path.dirname(filepath))
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(self.contents)
|
||||
self.contents = PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode()
|
||||
|
||||
def get_contents_on_file_system(self) -> bytes:
|
||||
filepath = self.get_full_filepath()
|
||||
with open(filepath) as f:
|
||||
return f.read().encode()
|
||||
|
||||
def get_full_filepath(self) -> str:
|
||||
dir_parts = self.__class__.get_hashed_directory_structure(self.digest)
|
||||
return os.path.join(
|
||||
current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"], *dir_parts, self.digest
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_hashed_directory_structure(cls, digest: str) -> list[str]:
|
||||
dir_parts = []
|
||||
for ii in range(PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT):
|
||||
start_index = ii * PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT
|
||||
end_index = start_index + PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM_DIR_COUNT
|
||||
dir_parts.append(digest[start_index:end_index])
|
||||
return dir_parts
|
||||
|
|
|
@ -237,7 +237,7 @@ def process_data_file_download(
|
|||
)
|
||||
mimetype = file_data.mimetype
|
||||
filename = file_data.filename
|
||||
file_contents = file_data.contents
|
||||
file_contents = file_data.get_contents()
|
||||
|
||||
return Response(
|
||||
file_contents,
|
||||
|
|
|
@ -32,7 +32,7 @@ class GetEncodedFileData(Script):
|
|||
process_instance_id=process_instance_id,
|
||||
).first()
|
||||
|
||||
base64_value = base64.b64encode(file_data.contents).decode("ascii")
|
||||
base64_value = base64.b64encode(file_data.get_contents()).decode("ascii")
|
||||
encoded_file_data = f"data:{file_data.mimetype};name={file_data.filename};base64,{base64_value}"
|
||||
|
||||
return encoded_file_data
|
||||
|
|
|
@ -388,7 +388,7 @@ class ProcessInstanceService:
|
|||
process_instance_id=process_instance_id,
|
||||
mimetype=mimetype,
|
||||
filename=filename,
|
||||
contents=contents, # type: ignore
|
||||
contents=contents,
|
||||
digest=digest,
|
||||
updated_at_in_seconds=now_in_seconds,
|
||||
created_at_in_seconds=now_in_seconds,
|
||||
|
@ -443,6 +443,8 @@ class ProcessInstanceService:
|
|||
models = cls.replace_file_data_with_digest_references(data, process_instance_id)
|
||||
|
||||
for model in models:
|
||||
if current_app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"] is not None:
|
||||
model.store_file_on_file_system()
|
||||
db.session.add(model)
|
||||
db.session.commit()
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel
|
||||
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
|
||||
from spiffworkflow_backend.models.process_instance_report import ReportMetadata
|
||||
|
@ -1552,6 +1553,90 @@ class TestProcessApi(BaseTest):
|
|||
assert response.status_code == 200
|
||||
assert response.data == expected_content
|
||||
|
||||
def test_can_download_uploaded_file_from_file_system(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
# it's just for convenience, since the root_path gets deleted after every test
|
||||
with self.app_config_mock(
|
||||
app, "SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH", ProcessModelService.root_path()
|
||||
):
|
||||
process_group_id = "test_message_send"
|
||||
process_model_id = "message_sender"
|
||||
bpmn_file_name = "message_sender.bpmn"
|
||||
bpmn_file_location = "message_send_one_conversation"
|
||||
process_model = self.create_group_and_model_with_bpmn(
|
||||
client,
|
||||
with_super_admin_user,
|
||||
process_group_id=process_group_id,
|
||||
process_model_id=process_model_id,
|
||||
bpmn_file_name=bpmn_file_name,
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
def _file_data(i: int, c: bytes) -> str:
|
||||
b64 = base64.b64encode(c).decode()
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
def _digest_reference(i: int, sha: str) -> str:
|
||||
b64 = f"{ProcessInstanceService.FILE_DATA_DIGEST_PREFIX}{sha}"
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
file_contents = [f"contents{i}".encode() for i in range(3)]
|
||||
file_data = [_file_data(i, c) for i, c in enumerate(file_contents)]
|
||||
digests = [sha256(c).hexdigest() for c in file_contents]
|
||||
[_digest_reference(i, d) for i, d in enumerate(digests)]
|
||||
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "But seriously.",
|
||||
"file0": file_data[0],
|
||||
"key": [{"file1": file_data[1]}],
|
||||
"key2": {"key3": [{"key4": file_data[2], "key5": "bob"}]},
|
||||
}
|
||||
|
||||
response = self.create_process_instance_from_process_model_id_with_api(
|
||||
client,
|
||||
process_model.id,
|
||||
self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.json is not None
|
||||
process_instance_id = response.json["id"]
|
||||
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
task = processor.get_all_user_tasks()[0]
|
||||
human_task = process_instance.active_human_tasks[0]
|
||||
|
||||
ProcessInstanceService.complete_form_task(
|
||||
processor,
|
||||
task,
|
||||
payload,
|
||||
with_super_admin_user,
|
||||
human_task,
|
||||
)
|
||||
processor.save()
|
||||
|
||||
for expected_content, digest in zip(file_contents, digests, strict=True):
|
||||
response = client.get(
|
||||
f"/v1.0/process-data-file-download/{self.modify_process_identifier_for_path_param(process_model.id)}/{process_instance_id}/{digest}",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data == expected_content
|
||||
|
||||
dir_parts = ProcessInstanceFileDataModel.get_hashed_directory_structure(digest)
|
||||
filepath = os.path.join(
|
||||
app.config["SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH"], *dir_parts, digest
|
||||
)
|
||||
assert os.path.isfile(filepath)
|
||||
|
||||
def test_process_instance_can_be_terminated(
|
||||
self,
|
||||
app: Flask,
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
import hashlib
|
||||
|
||||
from flask.app import Flask
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
|
||||
class TestProcessInstanceFileData(BaseTest):
|
||||
def test_returns_correct_dir_structure_from_digest(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
digest = hashlib.sha256(b"OH YEAH").hexdigest()
|
||||
digest_parts = ProcessInstanceFileDataModel.get_hashed_directory_structure(digest)
|
||||
assert digest == "b65b894bb56d8cf56e1045bbac80ea1d313640f7ee3ee724f43b2a07be5bff5f"
|
||||
assert digest_parts == ["b6", "5b"]
|
|
@ -0,0 +1,64 @@
|
|||
import hashlib
|
||||
import json
|
||||
|
||||
from flask.app import Flask
|
||||
from flask.testing import FlaskClient
|
||||
from spiffworkflow_backend.data_migrations.process_instance_file_data_migrator import ProcessInstanceFileDataMigrator
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_file_data import PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
|
||||
class TestProcessInstanceFileDataMigrator(BaseTest):
|
||||
def test_can_migrate_from_db_to_fs(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
process_model = load_test_spec(
|
||||
process_model_id="test_group/random_fact",
|
||||
bpmn_file_name="random_fact_set.bpmn",
|
||||
process_model_source_directory="random_fact",
|
||||
)
|
||||
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
assert process_instance.status == ProcessInstanceStatus.complete.value
|
||||
|
||||
test_file_one_contents = json.dumps({"hello": "hey"})
|
||||
test_file_one_digest = hashlib.sha256(test_file_one_contents.encode()).hexdigest()
|
||||
pi_files = [
|
||||
{
|
||||
"mimetype": "application/json",
|
||||
"filename": "test_file_one.json",
|
||||
"contents": test_file_one_contents,
|
||||
"digest": test_file_one_digest,
|
||||
}
|
||||
]
|
||||
for pi_file in pi_files:
|
||||
pi_model = ProcessInstanceFileDataModel(
|
||||
process_instance_id=process_instance.id,
|
||||
mimetype=pi_file["mimetype"],
|
||||
filename=pi_file["filename"],
|
||||
contents=pi_file["contents"].encode(),
|
||||
digest=pi_file["digest"],
|
||||
)
|
||||
db.session.add(pi_model)
|
||||
db.session.commit()
|
||||
|
||||
with self.app_config_mock(
|
||||
app, "SPIFFWORKFLOW_BACKEND_PROCESS_INSTANCE_FILE_DATA_FILESYSTEM_PATH", ProcessModelService.root_path()
|
||||
):
|
||||
ProcessInstanceFileDataMigrator.migrate_from_database_to_filesystem()
|
||||
|
||||
test_file_one_model = ProcessInstanceFileDataModel.query.filter_by(filename="test_file_one.json").first()
|
||||
assert test_file_one_model is not None
|
||||
assert test_file_one_model.contents == PROCESS_INSTANCE_DATA_FILE_ON_FILE_SYSTEM.encode()
|
||||
assert test_file_one_model.get_contents() == test_file_one_contents.encode()
|
|
@ -98,7 +98,7 @@ class TestProcessInstanceService(BaseTest):
|
|||
assert model.process_instance_id == process_instance_id
|
||||
assert model.mimetype == "some/mimetype"
|
||||
assert model.filename == f"testing{i}.txt"
|
||||
assert model.contents == _file_content(i) # type: ignore
|
||||
assert model.contents == _file_content(i)
|
||||
assert model.digest == _digest(i)
|
||||
|
||||
def test_does_not_skip_events_it_does_not_know_about(self) -> None:
|
||||
|
|
Loading…
Reference in New Issue