Make file upload detection more flexible, add integration test (#854)
This commit is contained in:
parent
c96b4b751a
commit
ad3e63367f
|
@ -0,0 +1,42 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: bc2b84d013e0
|
||||
Revises: 3191627ae224
|
||||
Create Date: 2024-01-03 13:06:57.981736
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'bc2b84d013e0'
|
||||
down_revision = '3191627ae224'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('process_instance_file_data', schema=None) as batch_op:
|
||||
batch_op.drop_column('identifier')
|
||||
batch_op.drop_column('list_index')
|
||||
|
||||
with op.batch_alter_table('task', schema=None) as batch_op:
|
||||
batch_op.drop_index('ix_task_guid')
|
||||
batch_op.create_index(batch_op.f('ix_task_guid'), ['guid'], unique=True)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('task', schema=None) as batch_op:
|
||||
batch_op.drop_index(batch_op.f('ix_task_guid'))
|
||||
batch_op.create_index('ix_task_guid', ['guid'], unique=False)
|
||||
|
||||
with op.batch_alter_table('process_instance_file_data', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('list_index', mysql.INTEGER(), autoincrement=False, nullable=True))
|
||||
batch_op.add_column(sa.Column('identifier', mysql.VARCHAR(length=255), nullable=False))
|
||||
|
||||
# ### end Alembic commands ###
|
|
@ -14,13 +14,10 @@ class ProcessInstanceFileDataModel(SpiffworkflowBaseDBModel):
|
|||
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False, index=True) # type: ignore
|
||||
identifier: str = db.Column(db.String(255), nullable=False)
|
||||
list_index: int | None = db.Column(db.Integer, nullable=True)
|
||||
mimetype: str = db.Column(db.String(255), nullable=False)
|
||||
filename: str = db.Column(db.String(255), nullable=False)
|
||||
# this is not deferred because there is no reason to query this model if you do not want the contents
|
||||
contents: str = db.Column(db.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=False)
|
||||
digest: str = db.Column(db.String(64), nullable=False, index=True)
|
||||
|
||||
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||
created_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||
|
|
|
@ -2,6 +2,7 @@ import base64
|
|||
import hashlib
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
|
@ -46,6 +47,8 @@ from spiffworkflow_backend.services.workflow_execution_service import TaskRunnab
|
|||
from spiffworkflow_backend.services.workflow_service import WorkflowService
|
||||
from spiffworkflow_backend.specs.start_event import StartConfiguration
|
||||
|
||||
FileDataGenerator = Generator[tuple[dict | list, str | int, str], None, None]
|
||||
|
||||
|
||||
class ProcessInstanceService:
|
||||
FILE_DATA_DIGEST_PREFIX = "spifffiledatadigest+"
|
||||
|
@ -359,33 +362,28 @@ class ProcessInstanceService:
|
|||
@classmethod
|
||||
def file_data_model_for_value(
|
||||
cls,
|
||||
identifier: str,
|
||||
value: str,
|
||||
process_instance_id: int,
|
||||
) -> ProcessInstanceFileDataModel | None:
|
||||
if value.startswith("data:"):
|
||||
try:
|
||||
parts = value.split(";")
|
||||
mimetype = parts[0][5:]
|
||||
filename = unquote(parts[1].split("=")[1])
|
||||
base64_value = parts[2].split(",")[1]
|
||||
if not base64_value.startswith(cls.FILE_DATA_DIGEST_PREFIX):
|
||||
contents = base64.b64decode(base64_value)
|
||||
digest = hashlib.sha256(contents).hexdigest()
|
||||
now_in_seconds = round(time.time())
|
||||
with suppress(Exception):
|
||||
parts = value.split(";")
|
||||
mimetype = parts[0][5:]
|
||||
filename = unquote(parts[1].split("=")[1])
|
||||
base64_value = parts[2].split(",")[1]
|
||||
if not base64_value.startswith(cls.FILE_DATA_DIGEST_PREFIX):
|
||||
contents = base64.b64decode(base64_value)
|
||||
digest = hashlib.sha256(contents).hexdigest()
|
||||
now_in_seconds = round(time.time())
|
||||
|
||||
return ProcessInstanceFileDataModel(
|
||||
process_instance_id=process_instance_id,
|
||||
identifier=identifier,
|
||||
mimetype=mimetype,
|
||||
filename=filename,
|
||||
contents=contents, # type: ignore
|
||||
digest=digest,
|
||||
updated_at_in_seconds=now_in_seconds,
|
||||
created_at_in_seconds=now_in_seconds,
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.warning(e)
|
||||
return ProcessInstanceFileDataModel(
|
||||
process_instance_id=process_instance_id,
|
||||
mimetype=mimetype,
|
||||
filename=filename,
|
||||
contents=contents, # type: ignore
|
||||
digest=digest,
|
||||
updated_at_in_seconds=now_in_seconds,
|
||||
created_at_in_seconds=now_in_seconds,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
@ -393,66 +391,52 @@ class ProcessInstanceService:
|
|||
def possible_file_data_values(
|
||||
cls,
|
||||
data: dict[str, Any],
|
||||
) -> Generator[tuple[str, str, int | None], None, None]:
|
||||
for identifier, value in data.items():
|
||||
if isinstance(value, str):
|
||||
yield (identifier, value, None)
|
||||
if isinstance(value, list):
|
||||
for list_index, list_value in enumerate(value):
|
||||
if isinstance(list_value, str):
|
||||
yield (identifier, list_value, list_index)
|
||||
if isinstance(list_value, dict) and len(list_value) == 1:
|
||||
for v in list_value.values():
|
||||
if isinstance(v, str):
|
||||
yield (identifier, v, list_index)
|
||||
) -> FileDataGenerator:
|
||||
def values(collection: dict | list, elem: str | int | None, value: Any) -> FileDataGenerator:
|
||||
match (collection, elem, value):
|
||||
case (dict(), None, None):
|
||||
for k, v in collection.items(): # type: ignore
|
||||
yield from values(collection, k, v)
|
||||
case (list(), None, None):
|
||||
for i, v in enumerate(collection):
|
||||
yield from values(collection, i, v)
|
||||
case (_, _, dict() | list()):
|
||||
yield from values(value, None, None)
|
||||
case (_, _, str()) if elem is not None and value.startswith("data:"):
|
||||
yield (collection, elem, value)
|
||||
|
||||
yield from values(data, None, None)
|
||||
|
||||
@classmethod
|
||||
def file_data_models_for_data(
|
||||
def replace_file_data_with_digest_references(
|
||||
cls,
|
||||
data: dict[str, Any],
|
||||
process_instance_id: int,
|
||||
) -> list[ProcessInstanceFileDataModel]:
|
||||
models = []
|
||||
|
||||
for identifier, value, list_index in cls.possible_file_data_values(data):
|
||||
model = cls.file_data_model_for_value(identifier, value, process_instance_id)
|
||||
if model is not None:
|
||||
model.list_index = list_index
|
||||
models.append(model)
|
||||
for collection, elem, value in cls.possible_file_data_values(data):
|
||||
model = cls.file_data_model_for_value(value, process_instance_id)
|
||||
if model is None:
|
||||
continue
|
||||
models.append(model)
|
||||
digest_reference = f"data:{model.mimetype};name={model.filename};base64,{cls.FILE_DATA_DIGEST_PREFIX}{model.digest}"
|
||||
collection[elem] = digest_reference # type: ignore
|
||||
|
||||
return models
|
||||
|
||||
@classmethod
|
||||
def replace_file_data_with_digest_references(
|
||||
cls,
|
||||
data: dict[str, Any],
|
||||
models: list[ProcessInstanceFileDataModel],
|
||||
) -> None:
|
||||
for model in models:
|
||||
digest_reference = f"data:{model.mimetype};name={model.filename};base64,{cls.FILE_DATA_DIGEST_PREFIX}{model.digest}"
|
||||
if model.list_index is None:
|
||||
data[model.identifier] = digest_reference
|
||||
else:
|
||||
old_value = data[model.identifier][model.list_index]
|
||||
new_value: Any = digest_reference
|
||||
if isinstance(old_value, dict) and len(old_value) == 1:
|
||||
new_value = {k: digest_reference for k in old_value.keys()}
|
||||
data[model.identifier][model.list_index] = new_value
|
||||
|
||||
@classmethod
|
||||
def save_file_data_and_replace_with_digest_references(
|
||||
cls,
|
||||
data: dict[str, Any],
|
||||
process_instance_id: int,
|
||||
) -> None:
|
||||
models = cls.file_data_models_for_data(data, process_instance_id)
|
||||
models = cls.replace_file_data_with_digest_references(data, process_instance_id)
|
||||
|
||||
for model in models:
|
||||
db.session.add(model)
|
||||
db.session.commit()
|
||||
|
||||
cls.replace_file_data_with_digest_references(data, models)
|
||||
|
||||
@classmethod
|
||||
def update_form_task_data(
|
||||
cls,
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
"""Test Process Api Blueprint."""
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
|
@ -1643,6 +1644,80 @@ class TestProcessApi(BaseTest):
|
|||
assert response.json
|
||||
assert response.json["error_code"] == "message_not_accepted"
|
||||
|
||||
def test_can_download_uploaded_file(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
process_group_id = "test_message_send"
|
||||
process_model_id = "message_sender"
|
||||
bpmn_file_name = "message_sender.bpmn"
|
||||
bpmn_file_location = "message_send_one_conversation"
|
||||
process_model = self.create_group_and_model_with_bpmn(
|
||||
client,
|
||||
with_super_admin_user,
|
||||
process_group_id=process_group_id,
|
||||
process_model_id=process_model_id,
|
||||
bpmn_file_name=bpmn_file_name,
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
def _file_data(i: int, c: bytes) -> str:
|
||||
b64 = base64.b64encode(c).decode()
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
def _digest_reference(i: int, sha: str) -> str:
|
||||
b64 = f"{ProcessInstanceService.FILE_DATA_DIGEST_PREFIX}{sha}"
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
file_contents = [f"contents{i}".encode() for i in range(3)]
|
||||
file_data = [_file_data(i, c) for i, c in enumerate(file_contents)]
|
||||
digests = [sha256(c).hexdigest() for c in file_contents]
|
||||
[_digest_reference(i, d) for i, d in enumerate(digests)]
|
||||
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "But seriously.",
|
||||
"file0": file_data[0],
|
||||
"key": [{"file1": file_data[1]}],
|
||||
"key2": {"key3": [{"key4": file_data[2], "key5": "bob"}]},
|
||||
}
|
||||
|
||||
response = self.create_process_instance_from_process_model_id_with_api(
|
||||
client,
|
||||
process_model.id,
|
||||
self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.json is not None
|
||||
process_instance_id = response.json["id"]
|
||||
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
task = processor.get_all_user_tasks()[0]
|
||||
human_task = process_instance.active_human_tasks[0]
|
||||
|
||||
ProcessInstanceService.complete_form_task(
|
||||
processor,
|
||||
task,
|
||||
payload,
|
||||
with_super_admin_user,
|
||||
human_task,
|
||||
)
|
||||
processor.save()
|
||||
|
||||
for expected_content, digest in zip(file_contents, digests, strict=True):
|
||||
response = client.get(
|
||||
f"/v1.0/process-data-file-download/{self.modify_process_identifier_for_path_param(process_model.id)}/{process_instance_id}/{digest}",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.data == expected_content
|
||||
|
||||
def test_process_instance_can_be_terminated(
|
||||
self,
|
||||
app: Flask,
|
||||
|
|
|
@ -1,202 +1,105 @@
|
|||
import base64
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from flask.app import Flask
|
||||
from SpiffWorkflow.bpmn.event import PendingBpmnEvent # type: ignore
|
||||
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
|
||||
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
||||
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
|
||||
def _file_content(i: int) -> bytes:
|
||||
return f"testing{i}\n".encode()
|
||||
|
||||
|
||||
def _file_data(i: int) -> str:
|
||||
b64 = base64.b64encode(_file_content(i)).decode()
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
|
||||
def _digest(i: int) -> str:
|
||||
return hashlib.sha256(_file_content(i)).hexdigest()
|
||||
|
||||
|
||||
def _digest_reference(i: int) -> str:
|
||||
sha = _digest(i)
|
||||
b64 = f"{ProcessInstanceService.FILE_DATA_DIGEST_PREFIX}{sha}"
|
||||
return f"data:some/mimetype;name=testing{i}.txt;base64,{b64}"
|
||||
|
||||
|
||||
class TestProcessInstanceService(BaseTest):
|
||||
SAMPLE_FILE_DATA = "data:some/mimetype;name=testing.txt;base64,dGVzdGluZwo="
|
||||
SAMPLE_DIGEST_REFERENCE = f"data:some/mimetype;name=testing.txt;base64,{ProcessInstanceService.FILE_DATA_DIGEST_PREFIX}12a61f4e173fb3a11c05d6471f74728f76231b4a5fcd9667cef3af87a3ae4dc2" # noqa: B950,E501
|
||||
|
||||
def _check_sample_file_data_model(
|
||||
self,
|
||||
identifier: str,
|
||||
list_index: int | None,
|
||||
model: ProcessInstanceFileDataModel | None,
|
||||
) -> None:
|
||||
assert model is not None
|
||||
assert model.identifier == identifier
|
||||
assert model.process_instance_id == 111
|
||||
assert model.list_index == list_index
|
||||
assert model.mimetype == "some/mimetype"
|
||||
assert model.filename == "testing.txt"
|
||||
assert model.contents == b"testing\n" # type: ignore
|
||||
assert model.digest == "12a61f4e173fb3a11c05d6471f74728f76231b4a5fcd9667cef3af87a3ae4dc2"
|
||||
|
||||
def test_can_create_file_data_model_for_file_data_value(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
model = ProcessInstanceService.file_data_model_for_value(
|
||||
"uploaded_file",
|
||||
self.SAMPLE_FILE_DATA,
|
||||
111,
|
||||
)
|
||||
self._check_sample_file_data_model("uploaded_file", None, model)
|
||||
|
||||
def test_does_not_create_file_data_model_for_non_file_data_value(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
model = ProcessInstanceService.file_data_model_for_value(
|
||||
"not_a_file",
|
||||
"just a value",
|
||||
111,
|
||||
)
|
||||
assert model is None
|
||||
|
||||
def test_can_create_file_data_models_for_single_file_data_values(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"uploaded_file": self.SAMPLE_FILE_DATA,
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
|
||||
assert len(models) == 1
|
||||
self._check_sample_file_data_model("uploaded_file", None, models[0])
|
||||
|
||||
def test_can_create_file_data_models_for_multiple_file_data_values(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
|
||||
assert len(models) == 2
|
||||
self._check_sample_file_data_model("uploaded_files", 0, models[0])
|
||||
self._check_sample_file_data_model("uploaded_files", 1, models[1])
|
||||
|
||||
def test_can_create_file_data_models_for_mix_of_file_data_and_non_file_data_values(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"not_a_file": "just a value",
|
||||
"uploaded_file": self.SAMPLE_FILE_DATA,
|
||||
"not_a_file2": "just a value 2",
|
||||
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
|
||||
"not_a_file3": "just a value 3",
|
||||
"uploaded_files2": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
|
||||
"uploaded_file2": self.SAMPLE_FILE_DATA,
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
|
||||
assert len(models) == 6
|
||||
self._check_sample_file_data_model("uploaded_file", None, models[0])
|
||||
self._check_sample_file_data_model("uploaded_files", 0, models[1])
|
||||
self._check_sample_file_data_model("uploaded_files", 1, models[2])
|
||||
self._check_sample_file_data_model("uploaded_files2", 0, models[3])
|
||||
self._check_sample_file_data_model("uploaded_files2", 1, models[4])
|
||||
self._check_sample_file_data_model("uploaded_file2", None, models[5])
|
||||
|
||||
def test_does_not_create_file_data_models_for_non_file_data_values(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"not_a_file": "just a value",
|
||||
"also_no_files": ["not a file", "also not a file"],
|
||||
"still_no_files": [{"key": "value"}],
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
|
||||
assert len(models) == 0
|
||||
|
||||
def test_can_replace_file_data_values_with_digest_references(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"uploaded_file": self.SAMPLE_FILE_DATA,
|
||||
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
ProcessInstanceService.replace_file_data_with_digest_references(data, models)
|
||||
|
||||
assert data == {
|
||||
"uploaded_file": self.SAMPLE_DIGEST_REFERENCE,
|
||||
"uploaded_files": [
|
||||
self.SAMPLE_DIGEST_REFERENCE,
|
||||
self.SAMPLE_DIGEST_REFERENCE,
|
||||
],
|
||||
}
|
||||
|
||||
def test_does_not_replace_non_file_data_values_with_digest_references(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"not_a_file": "just a value",
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
ProcessInstanceService.replace_file_data_with_digest_references(data, models)
|
||||
|
||||
assert len(data) == 1
|
||||
assert data["not_a_file"] == "just a value"
|
||||
|
||||
def test_can_replace_file_data_values_with_digest_references_when_non_file_data_values_are_present(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"not_a_file": "just a value",
|
||||
"uploaded_file": self.SAMPLE_FILE_DATA,
|
||||
"not_a_file2": "just a value2",
|
||||
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
|
||||
"not_a_file3": "just a value3",
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
ProcessInstanceService.replace_file_data_with_digest_references(data, models)
|
||||
|
||||
assert data == {
|
||||
"not_a_file": "just a value",
|
||||
"uploaded_file": self.SAMPLE_DIGEST_REFERENCE,
|
||||
"not_a_file2": "just a value2",
|
||||
"uploaded_files": [
|
||||
self.SAMPLE_DIGEST_REFERENCE,
|
||||
self.SAMPLE_DIGEST_REFERENCE,
|
||||
],
|
||||
"not_a_file3": "just a value3",
|
||||
}
|
||||
|
||||
def test_can_create_file_data_models_for_mulitple_single_file_data_values(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
data = {
|
||||
"File": [
|
||||
@pytest.mark.parametrize(
|
||||
"data,expected_data,expected_models_len",
|
||||
[
|
||||
({}, {}, 0),
|
||||
({"k": "v"}, {"k": "v"}, 0),
|
||||
({"k": _file_data(0)}, {"k": _digest_reference(0)}, 1),
|
||||
({"k": [_file_data(0)]}, {"k": [_digest_reference(0)]}, 1),
|
||||
({"k": [_file_data(0), _file_data(1)]}, {"k": [_digest_reference(0), _digest_reference(1)]}, 2),
|
||||
({"k": [{"k2": _file_data(0)}]}, {"k": [{"k2": _digest_reference(0)}]}, 1),
|
||||
(
|
||||
{"k": [{"k2": _file_data(0)}, {"k2": _file_data(1)}, {"k2": _file_data(2)}]},
|
||||
{"k": [{"k2": _digest_reference(0)}, {"k2": _digest_reference(1)}, {"k2": _digest_reference(2)}]},
|
||||
3,
|
||||
),
|
||||
({"k": [{"k2": _file_data(0), "k3": "bob"}]}, {"k": [{"k2": _digest_reference(0), "k3": "bob"}]}, 1),
|
||||
(
|
||||
{"k": [{"k2": _file_data(0), "k3": "bob"}, {"k2": _file_data(1), "k3": "bob"}]},
|
||||
{"k": [{"k2": _digest_reference(0), "k3": "bob"}, {"k2": _digest_reference(1), "k3": "bob"}]},
|
||||
2,
|
||||
),
|
||||
(
|
||||
{
|
||||
"supporting_files": self.SAMPLE_FILE_DATA,
|
||||
"k": [
|
||||
{
|
||||
"k2": [
|
||||
{"k3": [_file_data(0)]},
|
||||
{"k4": {"k5": {"k6": [_file_data(1), _file_data(2)], "k7": _file_data(3)}}},
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"supporting_files": self.SAMPLE_FILE_DATA,
|
||||
"k": [
|
||||
{
|
||||
"k2": [
|
||||
{"k3": [_digest_reference(0)]},
|
||||
{"k4": {"k5": {"k6": [_digest_reference(1), _digest_reference(2)], "k7": _digest_reference(3)}}},
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
],
|
||||
}
|
||||
models = ProcessInstanceService.file_data_models_for_data(data, 111)
|
||||
4,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_save_file_data_v0(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
data: dict[str, Any],
|
||||
expected_data: dict[str, Any],
|
||||
expected_models_len: int,
|
||||
) -> None:
|
||||
process_instance_id = 123
|
||||
models = ProcessInstanceService.replace_file_data_with_digest_references(
|
||||
data,
|
||||
process_instance_id,
|
||||
)
|
||||
|
||||
assert len(models) == 2
|
||||
self._check_sample_file_data_model("File", 0, models[0])
|
||||
self._check_sample_file_data_model("File", 1, models[1])
|
||||
assert data == expected_data
|
||||
assert len(models) == expected_models_len
|
||||
|
||||
for i, model in enumerate(models):
|
||||
assert model.process_instance_id == process_instance_id
|
||||
assert model.mimetype == "some/mimetype"
|
||||
assert model.filename == f"testing{i}.txt"
|
||||
assert model.contents == _file_content(i) # type: ignore
|
||||
assert model.digest == _digest(i)
|
||||
|
||||
def test_does_not_skip_events_it_does_not_know_about(self) -> None:
|
||||
name = None
|
||||
|
|
Loading…
Reference in New Issue