mirror of
https://github.com/sartography/spiffworkflow-backend.git
synced 2025-02-24 13:28:31 +00:00
tests are passing w/ burnettk
This commit is contained in:
parent
c4a933ec4c
commit
ba75d5a9fc
@ -282,10 +282,10 @@ def process_instance_create(
|
||||
|
||||
|
||||
def process_instance_run(
|
||||
_process_group_id: str,
|
||||
_process_model_id: str,
|
||||
process_group_id: str,
|
||||
process_model_id: str,
|
||||
process_instance_id: int,
|
||||
_do_engine_steps: bool = False,
|
||||
do_engine_steps: bool = False,
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process_instance_run."""
|
||||
process_instance = ProcessInstanceService().get_process_instance(
|
||||
|
@ -179,15 +179,15 @@ class ProcessModelService(FileSystemService):
|
||||
return []
|
||||
return process_group.process_models
|
||||
|
||||
def get_process_group(self, process_group_id: str) -> Optional[ProcessGroup]:
|
||||
def get_process_group(self, process_group_id: str) -> ProcessGroup:
|
||||
"""Look for a given process_group, and return it."""
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
return None # Nothing to scan yet. There are no files.
|
||||
if os.path.exists(FileSystemService.root_path()):
|
||||
with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
for item in directory_items:
|
||||
if item.is_dir() and item.name == process_group_id:
|
||||
return self.__scan_process_group(item)
|
||||
return None
|
||||
|
||||
raise ProcessEntityNotFoundError("process_group_not_found", f"Process Group Id: {process_group_id}")
|
||||
|
||||
def add_process_group(self, process_group: ProcessGroup) -> ProcessGroup:
|
||||
"""Add_process_group."""
|
||||
|
@ -18,7 +18,11 @@ def assure_process_group_exists(process_group_id: Optional[str] = None) -> Proce
|
||||
process_group = None
|
||||
workflow_spec_service = ProcessModelService()
|
||||
if process_group_id is not None:
|
||||
try:
|
||||
process_group = workflow_spec_service.get_process_group(process_group_id)
|
||||
except ProcessEntityNotFoundError:
|
||||
process_group = None
|
||||
|
||||
if process_group is None:
|
||||
process_group_id_to_create = "test_process_group"
|
||||
if process_group_id is not None:
|
||||
|
@ -234,6 +234,7 @@ def test_process_group_add(
|
||||
|
||||
# Check what is returned
|
||||
result = ProcessGroupSchema().loads(response.get_data(as_text=True))
|
||||
assert result is not None
|
||||
assert result.display_name == "Another Test Category"
|
||||
assert result.id == "test"
|
||||
|
||||
@ -262,8 +263,8 @@ def test_process_group_delete(
|
||||
f"/v1.0/process-groups/{process_group_id}", headers=logged_in_headers(user)
|
||||
)
|
||||
|
||||
deleted = ProcessModelService().get_process_group(process_group_id)
|
||||
assert deleted is None
|
||||
with pytest.raises(ProcessEntityNotFoundError):
|
||||
ProcessModelService().get_process_group(process_group_id)
|
||||
|
||||
|
||||
def test_process_group_update(
|
||||
@ -585,12 +586,14 @@ def test_process_instance_run(
|
||||
response = create_process_instance(
|
||||
client, process_group_id, process_model_id, headers
|
||||
)
|
||||
assert response.json is not None
|
||||
process_instance_id = response.json["id"]
|
||||
response = client.post(
|
||||
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run",
|
||||
headers=logged_in_headers(user),
|
||||
)
|
||||
|
||||
assert response.json is not None
|
||||
assert type(response.json["updated_at_in_seconds"]) is int
|
||||
assert response.json["updated_at_in_seconds"] > 0
|
||||
assert response.json["status"] == "complete"
|
||||
@ -832,6 +835,8 @@ def test_error_handler(
|
||||
response = create_process_instance(
|
||||
client, process_group_id, process_model_id, headers
|
||||
)
|
||||
|
||||
assert response.json is not None
|
||||
process_instance_id = response.json["id"]
|
||||
process = (
|
||||
db.session.query(ProcessInstanceModel)
|
||||
@ -961,19 +966,19 @@ def create_process_model(
|
||||
|
||||
def create_spec_file(
|
||||
client: FlaskClient,
|
||||
process_group_id: str = None,
|
||||
process_model_id: str = None,
|
||||
file_name: str = None,
|
||||
file_data: bytes = None,
|
||||
process_group_id: str = "",
|
||||
process_model_id: str = "",
|
||||
file_name: str = "",
|
||||
file_data: bytes = b"",
|
||||
) -> Dict[str, Optional[Union[str, bool, int]]]:
|
||||
"""Test_create_spec_file."""
|
||||
if process_group_id is None:
|
||||
if process_group_id == "":
|
||||
process_group_id = "random_fact"
|
||||
if process_model_id is None:
|
||||
if process_model_id == "":
|
||||
process_model_id = "random_fact"
|
||||
if file_name is None:
|
||||
if file_name == "":
|
||||
file_name = "random_fact.svg"
|
||||
if file_data is None:
|
||||
if file_data == b"":
|
||||
file_data = b"abcdef"
|
||||
spec = load_test_spec(process_model_id, process_group_id=process_group_id)
|
||||
data = {"file": (io.BytesIO(file_data), file_name)}
|
||||
|
Loading…
x
Reference in New Issue
Block a user