First stab at nested folders.

Added temp endpoints
Changes to tests and test helpers
This commit is contained in:
mike cullerton 2022-11-01 16:48:04 -04:00
parent 0642c16467
commit 5f41828baf
33 changed files with 1555 additions and 508 deletions

View File

@ -4,6 +4,7 @@ import shutil
import pytest import pytest
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -78,17 +79,35 @@ def with_super_admin_user() -> UserModel:
@pytest.fixture() @pytest.fixture()
def setup_process_instances_for_reports() -> list[ProcessInstanceModel]: def setup_process_instances_for_reports(client: FlaskClient, with_super_admin_user: UserModel) -> list[ProcessInstanceModel]:
"""Setup_process_instances_for_reports.""" """Setup_process_instances_for_reports."""
user = BaseTest.find_or_create_user() user = with_super_admin_user
process_group_id = "runs_without_input" process_group_id = "runs_without_input"
process_model_id = "sample" process_model_id = "sample"
load_test_spec(process_group_id=process_group_id, process_model_id=process_model_id) bpmn_file_name = "sample.bpmn"
bpmn_file_location = "sample"
process_model_identifier = BaseTest().basic_test_setup(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model_id=process_model_id,
# bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location
)
# BaseTest().create_process_group(
# client=client, user=user, process_group_id=process_group_id, display_name=process_group_id
# )
# process_model_id = "runs_without_input/sample"
# load_test_spec(
# process_model_id=f"{process_group_id}/{process_model_id}",
# process_model_source_directory="sample"
# )
process_instances = [] process_instances = []
for data in [kay(), ray(), jay()]: for data in [kay(), ray(), jay()]:
process_instance = ProcessInstanceService.create_process_instance( process_instance = ProcessInstanceService.create_process_instance(
process_group_identifier=process_group_id, # process_group_identifier=process_group_id,
process_model_identifier=process_model_id, process_model_identifier=process_model_identifier,
user=user, user=user,
) )
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)

View File

@ -288,6 +288,36 @@ paths:
schema: schema:
$ref: "#/components/schemas/ProcessModel" $ref: "#/components/schemas/ProcessModel"
/process-models/{modified_process_model_id}/files:
parameters:
- name: modified_process_model_id
in: path
required: true
description: The process_model_id, modified to replace slashes (/)
schema:
type: string
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.add_file_2
summary: Add a new workflow spec file
tags:
- Process Model Files
requestBody:
content:
multipart/form-data:
schema:
type: object
properties:
file:
type: string
format: binary
responses:
"201":
description: Metadata about the uploaded file, but not the file content.
content:
application/json:
schema:
$ref: "#/components/schemas/File"
/process-models/{process_group_id}/{process_model_id}/files: /process-models/{process_group_id}/{process_model_id}/files:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -339,6 +369,56 @@ paths:
# items: # items:
# $ref: "#/components/schemas/File" # $ref: "#/components/schemas/File"
/process-models/{modified_process_model_identifier}:
parameters:
- name: modified_process_model_identifier
in: path
required: true
description: the modified process model id
schema:
type: string
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_show_2
summary: Returns a single process model
tags:
- Process Models
responses:
"200":
description: Workflow spec.
content:
application/json:
schema:
$ref: "#/components/schemas/ProcessModel"
put:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_update_2
summary: Modifies an existing process model with the given parameters.
tags:
- Process Models
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ProcessModel"
responses:
"200":
description: Process model updated successfully.
content:
application/json:
schema:
$ref: "#/components/schemas/ProcessModel"
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_delete_2
summary: Removes an existing process model
tags:
- Process Models
responses:
"200":
description: The process model has been removed.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
/process-models/{process_group_id}/{process_model_id}: /process-models/{process_group_id}/{process_model_id}:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -550,6 +630,28 @@ paths:
schema: schema:
$ref: "#/components/schemas/Workflow" $ref: "#/components/schemas/Workflow"
/process-models/{modified_process_model_id}/process-instances:
parameters:
- name: modified_process_model_id
in: path
required: true
description: The unique id of an existing process model.
schema:
type: string
# process_instance_create
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_create_2
summary: Creates an process instance from a process model and returns the instance
tags:
- Process Instances
responses:
"201":
description: Workflow generated successfully
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances: /process-models/{process_group_id}/{process_model_id}/process-instances:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -578,6 +680,54 @@ paths:
schema: schema:
$ref: "#/components/schemas/Workflow" $ref: "#/components/schemas/Workflow"
/process-instances/{process_instance_id}:
parameters:
- name: process_instance_id
in: path
required: true
description: The unique id of an existing process instance.
schema:
type: integer
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_delete_2
summary: Deletes a single process instance
tags:
- Process Instances
responses:
"200":
description: The process instance was deleted.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
/process-models/{modified_process_model_identifier}/process-instances/{process_instance_id}:
parameters:
- name: modified_process_model_identifier
in: path
required: true
description: The unique id of an existing process model
schema:
type: string
- name: process_instance_id
in: path
required: true
description: The unique id of an existing process instance.
schema:
type: integer
get:
tags:
- Process Instances
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_show_2
summary: Show information about a process instance
responses:
"200":
description: One Process Instance
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}: /process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -624,6 +774,34 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/process-instances/{process_instance_id}/run:
parameters:
- name: process_instance_id
in: path
required: true
description: The unique id of an existing process instance.
schema:
type: integer
- name: do_engine_steps
in: query
required: false
description: Defaults to true, can be set to false if you are just looking at the workflow not completeing it.
schema:
type: boolean
# process_instance_run
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_run_2
summary: Run a process instance
tags:
- Process Instances
responses:
"200":
description: Returns details about the workflows state and current task
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run: /process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -664,6 +842,27 @@ paths:
schema: schema:
$ref: "#/components/schemas/Workflow" $ref: "#/components/schemas/Workflow"
/process-instances/{process_instance_id}/terminate:
parameters:
- name: process_instance_id
in: path
required: true
description: The unique id of an existing process instance.
schema:
type: integer
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_terminate_2
summary: Terminate a process instance
tags:
- Process Instances
responses:
"200":
description: Empty ok true response on successful termination.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/terminate: /process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/terminate:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -763,6 +962,41 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/process-models/{modified_process_model_identifier}/process-instances/reports:
parameters:
- name: modified_process_model_identifier
in: path
required: true
description: The unique id of an existing process model
schema:
type: string
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_report_list_2
summary: Returns all process instance reports for process model
tags:
- Process Instances
responses:
"200":
description: Workflow.
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances/reports: /process-models/{process_group_id}/{process_model_id}/process-instances/reports:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -816,6 +1050,47 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/process-models/{modified_process_model_identifier}/process-instances/reports/{report_identifier}:
parameters:
- name: modified_process_model_identifier
in: path
required: true
description: The unique id of an existing process model.
schema:
type: string
- name: report_identifier
in: path
required: true
description: The unique id of an existing report
schema:
type: string
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_report_show_2
summary: Returns a report of process instances for a given process model
tags:
- Process Instances
responses:
"200":
description: Workflow.
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances/reports/{report_identifier}: /process-models/{process_group_id}/{process_model_id}/process-instances/reports/{report_identifier}:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -887,6 +1162,68 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/process-models/{modified_process_model_id}/files/{file_name}:
parameters:
- name: modified_process_model_id
in: path
required: true
description: The modified process model id
schema:
type: string
- name: file_name
in: path
required: true
description: The id of the spec file
schema:
type: string
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.get_file_2
summary: Returns metadata about the file
tags:
- Process Model Files
responses:
"200":
description: Returns the file information requested.
content:
application/json:
schema:
$ref: "#/components/schemas/File"
put:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_file_update_2
summary: save the contents to the given file
tags:
- Process Model Files
requestBody:
description: Log Pagination Request
required: false
content:
multipart/form-data:
schema:
type: object
properties:
file:
type: string
format: binary
responses:
"200":
description: Metadata about the uploaded file, but not the file content.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_file_delete_2
summary: Removes an existing process model file
tags:
- Process Model Files
responses:
"200":
description: The process model has been removed.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
/process-models/{process_group_id}/{process_model_id}/files/{file_name}: /process-models/{process_group_id}/{process_model_id}/files/{file_name}:
parameters: parameters:
- name: process_group_id - name: process_group_id
@ -1199,6 +1536,39 @@ paths:
schema: schema:
$ref: "#/components/schemas/Workflow" $ref: "#/components/schemas/Workflow"
/process-instances/{process_instance_id}/logs:
parameters:
- name: process_instance_id
in: path
required: true
description: the id of the process instance
schema:
type: integer
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The number of items to show per page. Defaults to page 10.
schema:
type: integer
get:
tags:
- Process Instances
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_instance_log_list_2
summary: returns a list of logs associated with the process instance
responses:
"200":
description: list of logs
content:
application/json:
schema:
$ref: "#/components/schemas/ProcessInstanceLog"
/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/logs: /process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/logs:
parameters: parameters:
- name: process_group_id - name: process_group_id

View File

@ -41,3 +41,15 @@ permissions:
users: [testuser4] users: [testuser4]
allowed_permissions: [create, read, update, delete] allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-models/finance/* uri: /v1.0/process-models/finance/*
finance-admin-model-lanes:
groups: ["Finance Team"]
users: [testuser4]
allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-models/finance:model_with_lanes/*
finance-admin-instance-run:
groups: ["Finance Team"]
users: [testuser4]
allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-instances/*

View File

@ -80,7 +80,7 @@ class ProcessInstanceReportModel(SpiffworkflowBaseDBModel):
"""Add_fixtures.""" """Add_fixtures."""
try: try:
process_model = ProcessModelService().get_process_model( process_model = ProcessModelService().get_process_model(
group_id="sartography-admin", process_model_id="ticket" process_model_id="sartography-admin/ticket"
) )
user = UserModel.query.first() user = UserModel.query.first()
columns = [ columns = [
@ -224,11 +224,11 @@ class ProcessInstanceReportModel(SpiffworkflowBaseDBModel):
) -> ProcessInstanceReportModel: ) -> ProcessInstanceReportModel:
"""Create_with_attributes.""" """Create_with_attributes."""
process_model = ProcessModelService().get_process_model( process_model = ProcessModelService().get_process_model(
group_id=process_group_identifier, process_model_id=process_model_identifier process_model_id=f"{process_model_identifier}"
) )
process_instance_report = cls( process_instance_report = cls(
identifier=identifier, identifier=identifier,
process_group_identifier=process_model.process_group_id, process_group_identifier="process_model.process_group_id",
process_model_identifier=process_model.id, process_model_identifier=process_model.id,
created_by_id=user.id, created_by_id=user.id,
report_metadata=report_metadata, report_metadata=report_metadata,

View File

@ -29,7 +29,7 @@ class ProcessModelInfo:
id: str id: str
display_name: str display_name: str
description: str description: str
process_group_id: str = "" # process_group_id: str = ""
process_group: Any | None = None process_group: Any | None = None
primary_file_name: str | None = None primary_file_name: str | None = None
primary_process_id: str | None = None primary_process_id: str | None = None
@ -41,7 +41,7 @@ class ProcessModelInfo:
def __post_init__(self) -> None: def __post_init__(self) -> None:
"""__post_init__.""" """__post_init__."""
self.sort_index = f"{self.process_group_id}:{self.id}" self.sort_index = self.id
def __eq__(self, other: Any) -> bool: def __eq__(self, other: Any) -> bool:
"""__eq__.""" """__eq__."""
@ -67,7 +67,6 @@ class ProcessModelInfoSchema(Schema):
primary_file_name = marshmallow.fields.String(allow_none=True) primary_file_name = marshmallow.fields.String(allow_none=True)
primary_process_id = marshmallow.fields.String(allow_none=True) primary_process_id = marshmallow.fields.String(allow_none=True)
is_review = marshmallow.fields.Boolean(allow_none=True) is_review = marshmallow.fields.Boolean(allow_none=True)
process_group_id = marshmallow.fields.String(allow_none=True)
files = marshmallow.fields.List(marshmallow.fields.Nested("FileSchema")) files = marshmallow.fields.List(marshmallow.fields.Nested("FileSchema"))
fault_or_suspend_on_exception = marshmallow.fields.String() fault_or_suspend_on_exception = marshmallow.fields.String()
exception_notification_addresses = marshmallow.fields.List( exception_notification_addresses = marshmallow.fields.List(

View File

@ -225,9 +225,10 @@ def process_model_add(
status_code=400, status_code=400,
) )
process_group_id, _ = os.path.split(process_model_info.id)
process_model_service = ProcessModelService() process_model_service = ProcessModelService()
process_group = process_model_service.get_process_group( process_group = process_model_service.get_process_group(
process_model_info.process_group_id process_group_id
) )
if process_group is None: if process_group is None:
raise ApiError( raise ApiError(
@ -245,14 +246,29 @@ def process_model_add(
) )
def process_model_delete_2(modified_process_model_identifier: str) -> flask.wrappers.Response:
process_model_id = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_id)
return process_model_delete(process_group_id, process_model_id)
def process_model_delete( def process_model_delete(
process_group_id: str, process_model_id: str process_group_id: str, process_model_id: str
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_model_delete.""" """Process_model_delete."""
ProcessModelService().process_model_delete(process_model_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
ProcessModelService().process_model_delete(process_model_identifier)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_model_update_2(
modified_process_model_identifier: str, body: Dict[str, Union[str, bool, int]]
):
process_model_identifier = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_model_update(process_group_id, process_model_id, body)
def process_model_update( def process_model_update(
process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]] process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]
) -> Any: ) -> Any:
@ -264,14 +280,24 @@ def process_model_update(
if include_item in body if include_item in body
} }
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
ProcessModelService().update_spec(process_model, body_filtered) ProcessModelService().update_spec(process_model, body_filtered)
return ProcessModelInfoSchema().dump(process_model) return ProcessModelInfoSchema().dump(process_model)
def process_model_show_2(modified_process_model_identifier: str) -> Any:
process_model_identifier = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_model_show(process_group_id, process_model_id)
def process_model_show(process_group_id: str, process_model_id: str) -> Any: def process_model_show(process_group_id: str, process_model_id: str) -> Any:
"""Process_model_show.""" """Process_model_show."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
# TODO: Temporary. Should not need the next line once models have correct ids
# process_model.id = process_model_identifier
files = sorted(SpecFileService.get_files(process_model)) files = sorted(SpecFileService.get_files(process_model))
process_model.files = files process_model.files = files
process_model_json = ProcessModelInfoSchema().dump(process_model) process_model_json = ProcessModelInfoSchema().dump(process_model)
@ -304,9 +330,16 @@ def process_model_list(
return Response(json.dumps(response_json), status=200, mimetype="application/json") return Response(json.dumps(response_json), status=200, mimetype="application/json")
def get_file_2(modified_process_model_id: str, file_name: str) -> Any:
process_model_id_string = modified_process_model_id.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_id_string)
return get_file(process_group_id, process_model_id, file_name)
def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Any: def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Any:
"""Get_file.""" """Get_file."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
files = SpecFileService.get_files(process_model, file_name) files = SpecFileService.get_files(process_model, file_name)
if len(files) == 0: if len(files) == 0:
raise ApiError( raise ApiError(
@ -320,15 +353,25 @@ def get_file(process_group_id: str, process_model_id: str, file_name: str) -> An
file_contents = SpecFileService.get_data(process_model, file.name) file_contents = SpecFileService.get_data(process_model, file.name)
file.file_contents = file_contents file.file_contents = file_contents
file.process_model_id = process_model.id file.process_model_id = process_model.id
file.process_group_id = process_model.process_group_id # file.process_group_id = process_model.process_group_id
return FileSchema().dump(file) return FileSchema().dump(file)
def process_model_file_update_2(
modified_process_model_id: str,
file_name: str
) -> flask.wrappers.Response:
process_model_id = modified_process_model_id.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_id)
return process_model_file_update(process_group_id, process_model_id, file_name)
def process_model_file_update( def process_model_file_update(
process_group_id: str, process_model_id: str, file_name: str process_group_id: str, process_model_id: str, file_name: str
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_model_file_update.""" """Process_model_file_update."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
request_file = get_file_from_request() request_file = get_file_from_request()
request_file_contents = request_file.stream.read() request_file_contents = request_file.stream.read()
@ -352,11 +395,18 @@ def process_model_file_update(
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_model_file_delete_2(modified_process_model_id, file_name):
process_model_identifier = modified_process_model_id.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_model_file_delete(process_group_id, process_model_id, file_name)
def process_model_file_delete( def process_model_file_delete(
process_group_id: str, process_model_id: str, file_name: str process_group_id: str, process_model_id: str, file_name: str
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_model_file_delete.""" """Process_model_file_delete."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
try: try:
SpecFileService.delete_file(process_model, file_name) SpecFileService.delete_file(process_model, file_name)
except FileNotFoundError as exception: except FileNotFoundError as exception:
@ -371,9 +421,16 @@ def process_model_file_delete(
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def add_file_2(modified_process_model_id: str) -> flask.wrappers.Response:
process_model_id_string = modified_process_model_id.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_id_string)
return add_file(process_group_id, process_model_id)
def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Response: def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
"""Add_file.""" """Add_file."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
request_file = get_file_from_request() request_file = get_file_from_request()
if not request_file.filename: if not request_file.filename:
raise ApiError( raise ApiError(
@ -388,18 +445,35 @@ def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Res
file_contents = SpecFileService.get_data(process_model, file.name) file_contents = SpecFileService.get_data(process_model, file.name)
file.file_contents = file_contents file.file_contents = file_contents
file.process_model_id = process_model.id file.process_model_id = process_model.id
file.process_group_id = process_model.process_group_id # file.process_group_id = process_model.process_group_id
return Response( return Response(
json.dumps(FileSchema().dump(file)), status=201, mimetype="application/json" json.dumps(FileSchema().dump(file)), status=201, mimetype="application/json"
) )
def process_instance_create_2(
modified_process_model_id: str
) -> flask.wrappers.Response:
"""Create_process_instance."""
process_model_id = modified_process_model_id.replace(":", "/")
process_instance = ProcessInstanceService.create_process_instance(
process_model_id, g.user
)
return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
status=201,
mimetype="application/json",
)
def process_instance_create( def process_instance_create(
process_group_id: str, process_model_id: str process_group_id: str, process_model_id: str
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Create_process_instance.""" """Create_process_instance."""
# process_model_id = modified_process_model_id.replace(":", "/")
process_model_identifier = f"{process_group_id}/{process_model_id}"
process_instance = ProcessInstanceService.create_process_instance( process_instance = ProcessInstanceService.create_process_instance(
process_model_id, g.user, process_group_identifier=process_group_id process_model_identifier, g.user
) )
return Response( return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)), json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
@ -408,6 +482,13 @@ def process_instance_create(
) )
def process_instance_run_2(
process_instance_id: int,
do_engine_steps: bool = True,
):
return process_instance_run(None, None, process_instance_id, do_engine_steps)
def process_instance_run( def process_instance_run(
process_group_id: str, process_group_id: str,
process_model_id: str, process_model_id: str,
@ -415,6 +496,7 @@ def process_instance_run(
do_engine_steps: bool = True, do_engine_steps: bool = True,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_instance_run.""" """Process_instance_run."""
# process_model_id = modified_process_model_id.replace(":", "/")
process_instance = ProcessInstanceService().get_process_instance( process_instance = ProcessInstanceService().get_process_instance(
process_instance_id process_instance_id
) )
@ -452,6 +534,12 @@ def process_instance_run(
) )
def process_instance_terminate_2(
process_instance_id: int
) -> flask.wrappers.Response:
return process_instance_terminate(None, None, process_instance_id, None)
def process_instance_terminate( def process_instance_terminate(
process_group_id: str, process_group_id: str,
process_model_id: str, process_model_id: str,
@ -495,6 +583,15 @@ def process_instance_resume(
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_instance_log_list_2(
process_instance_id: str,
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
print("process_instance_log_list_2")
return process_instance_log_list(None, None, process_instance_id, page, per_page)
def process_instance_log_list( def process_instance_log_list(
process_group_id: str, process_group_id: str,
process_model_id: str, process_model_id: str,
@ -672,7 +769,7 @@ def process_instance_list(
process_instance_query = ProcessInstanceModel.query process_instance_query = ProcessInstanceModel.query
if process_model_identifier is not None and process_group_identifier is not None: if process_model_identifier is not None and process_group_identifier is not None:
process_model = get_process_model( process_model = get_process_model(
process_model_identifier, process_group_identifier f"{process_group_identifier}/{process_model_identifier}",
) )
process_instance_query = process_instance_query.filter_by( process_instance_query = process_instance_query.filter_by(
@ -730,13 +827,20 @@ def process_instance_list(
return make_response(jsonify(response_json), 200) return make_response(jsonify(response_json), 200)
def process_instance_show_2(modified_process_model_identifier: str, process_instance_id: int) -> flask.wrappers.Response:
process_model_identifier = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_instance_show(process_group_id, process_model_id, process_instance_id)
def process_instance_show( def process_instance_show(
process_group_id: str, process_model_id: str, process_instance_id: int process_group_id: str, process_model_id: str, process_instance_id: int
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Create_process_instance.""" """Create_process_instance."""
process_model_identifier = f"{process_group_id}/{process_model_id}"
process_instance = find_process_instance_by_id_or_raise(process_instance_id) process_instance = find_process_instance_by_id_or_raise(process_instance_id)
current_version_control_revision = GitService.get_current_revision() current_version_control_revision = GitService.get_current_revision()
process_model = get_process_model(process_model_id, process_group_id) process_model = get_process_model(process_model_identifier)
if process_model.primary_file_name: if process_model.primary_file_name:
if ( if (
@ -755,6 +859,12 @@ def process_instance_show(
return make_response(jsonify(process_instance), 200) return make_response(jsonify(process_instance), 200)
def process_instance_delete_2(
process_instance_id: int
) -> flask.wrappers.Response:
return process_instance_delete(None, None, process_instance_id)
def process_instance_delete( def process_instance_delete(
process_group_id: str, process_model_id: str, process_instance_id: int process_group_id: str, process_model_id: str, process_instance_id: int
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
@ -769,15 +879,24 @@ def process_instance_delete(
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_instance_report_list_2(
modified_process_model_identifier: str, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
process_model_identifier = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_instance_report_list(process_group_id, process_model_id, page, per_page)
def process_instance_report_list( def process_instance_report_list(
process_group_id: str, process_model_id: str, page: int = 1, per_page: int = 100 process_group_id: str, process_model_id: str, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_instance_report_list.""" """Process_instance_report_list."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
# process_model = get_process_model(process_model_identifier)
process_instance_reports = ProcessInstanceReportModel.query.filter_by( process_instance_reports = ProcessInstanceReportModel.query.filter_by(
process_group_identifier=process_group_id, # process_group_identifier="process_group_id",
process_model_identifier=process_model.id, process_model_identifier=process_model_identifier,
).all() ).all()
return make_response(jsonify(process_instance_reports), 200) return make_response(jsonify(process_instance_reports), 200)
@ -884,6 +1003,17 @@ def authentication_callback(
) )
def process_instance_report_show_2(
modified_process_model_identifier: str,
report_identifier: str,
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
process_model_identifier = modified_process_model_identifier.replace(":", "/")
process_group_id, process_model_id = os.path.split(process_model_identifier)
return process_instance_report_show(process_group_id, process_model_id, report_identifier, page, per_page)
def process_instance_report_show( def process_instance_report_show(
process_group_id: str, process_group_id: str,
process_model_id: str, process_model_id: str,
@ -892,10 +1022,11 @@ def process_instance_report_show(
per_page: int = 100, per_page: int = 100,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_instance_list.""" """Process_instance_list."""
process_model = get_process_model(process_model_id, process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
process_instances = ( process_instances = (
ProcessInstanceModel.query.filter_by(process_model_identifier=process_model.id) ProcessInstanceModel.query.filter_by(process_model_identifier=process_model_identifier)
.order_by( .order_by(
ProcessInstanceModel.start_in_seconds.desc(), ProcessInstanceModel.id.desc() # type: ignore ProcessInstanceModel.start_in_seconds.desc(), ProcessInstanceModel.id.desc() # type: ignore
) )
@ -1002,7 +1133,6 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
process_model = get_process_model( process_model = get_process_model(
process_instance.process_model_identifier, process_instance.process_model_identifier,
process_instance.process_group_identifier,
) )
form_schema_file_name = "" form_schema_file_name = ""
@ -1271,12 +1401,12 @@ def get_file_from_request() -> Any:
return request_file return request_file
def get_process_model(process_model_id: str, process_group_id: str) -> ProcessModelInfo: def get_process_model(process_model_id: str) -> ProcessModelInfo:
"""Get_process_model.""" """Get_process_model."""
process_model = None process_model = None
try: try:
process_model = ProcessModelService().get_process_model( process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id process_model_id
) )
except ProcessEntityNotFoundError as exception: except ProcessEntityNotFoundError as exception:
raise ( raise (

View File

@ -35,7 +35,7 @@ class ErrorHandlingService:
) -> None: ) -> None:
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception.""" """On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
process_model = ProcessModelService().get_process_model( process_model = ProcessModelService().get_process_model(
_processor.process_model_identifier, _processor.process_group_identifier _processor.process_model_identifier
) )
if process_model.fault_or_suspend_on_exception == "suspend": if process_model.fault_or_suspend_on_exception == "suspend":
self.set_instance_status( self.set_instance_status(

View File

@ -54,18 +54,20 @@ class FileSystemService:
@staticmethod @staticmethod
def process_group_path_for_spec(spec: ProcessModelInfo) -> str: def process_group_path_for_spec(spec: ProcessModelInfo) -> str:
"""Category_path_for_spec.""" """Category_path_for_spec."""
return FileSystemService.process_group_path(spec.process_group_id) process_group_id, _ = os.path.split(spec.id)
return FileSystemService.process_group_path(process_group_id)
@staticmethod @staticmethod
def workflow_path(spec: ProcessModelInfo) -> str: def workflow_path(spec: ProcessModelInfo) -> str:
"""Workflow_path.""" """Workflow_path."""
process_group_path = FileSystemService.process_group_path_for_spec(spec) process_model_path = os.path.join(FileSystemService.root_path(), spec.id)
return os.path.join(process_group_path, spec.id) # process_group_path = FileSystemService.process_group_path_for_spec(spec)
return process_model_path
@staticmethod @staticmethod
def full_path_to_process_model_file(spec: ProcessModelInfo, file_name: str) -> str: def full_path_to_process_model_file(spec: ProcessModelInfo) -> str:
"""Full_path_to_process_model_file.""" """Full_path_to_process_model_file."""
return os.path.join(FileSystemService.workflow_path(spec), file_name) return os.path.join(FileSystemService.workflow_path(spec), spec.primary_file_name)
def next_display_order(self, spec: ProcessModelInfo) -> int: def next_display_order(self, spec: ProcessModelInfo) -> int:
"""Next_display_order.""" """Next_display_order."""

View File

@ -120,7 +120,6 @@ class MessageService:
process_instance_receive = ProcessInstanceService.create_process_instance( process_instance_receive = ProcessInstanceService.create_process_instance(
message_triggerable_process_model.process_model_identifier, message_triggerable_process_model.process_model_identifier,
user, user,
process_group_identifier=message_triggerable_process_model.process_group_identifier,
) )
processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.do_engine_steps(save=False) processor_receive.do_engine_steps(save=False)

View File

@ -264,8 +264,7 @@ class ProcessInstanceProcessor:
bpmn_process_spec, bpmn_process_spec,
subprocesses, subprocesses,
) = ProcessInstanceProcessor.get_process_model_and_subprocesses( ) = ProcessInstanceProcessor.get_process_model_and_subprocesses(
process_instance_model.process_model_identifier, process_instance_model.process_model_identifier
process_instance_model.process_group_identifier,
) )
else: else:
bpmn_json_length = len(process_instance_model.bpmn_json.encode("utf-8")) bpmn_json_length = len(process_instance_model.bpmn_json.encode("utf-8"))
@ -316,7 +315,7 @@ class ProcessInstanceProcessor:
check_sub_specs(test_spec, 5) check_sub_specs(test_spec, 5)
self.process_model_identifier = process_instance_model.process_model_identifier self.process_model_identifier = process_instance_model.process_model_identifier
self.process_group_identifier = process_instance_model.process_group_identifier # self.process_group_identifier = process_instance_model.process_group_identifier
try: try:
self.bpmn_process_instance = self.__get_bpmn_process_instance( self.bpmn_process_instance = self.__get_bpmn_process_instance(
@ -351,17 +350,17 @@ class ProcessInstanceProcessor:
@classmethod @classmethod
def get_process_model_and_subprocesses( def get_process_model_and_subprocesses(
cls, process_model_identifier: str, process_group_identifier: str cls, process_model_identifier: str
) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]: ) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]:
"""Get_process_model_and_subprocesses.""" """Get_process_model_and_subprocesses."""
process_model_info = ProcessModelService().get_process_model( process_model_info = ProcessModelService().get_process_model(
process_model_identifier, process_group_identifier process_model_identifier
) )
if process_model_info is None: if process_model_info is None:
raise ( raise (
ApiError( ApiError(
"process_model_not_found", "process_model_not_found",
f"The given process model was not found: {process_group_identifier}/{process_model_identifier}.", f"The given process model was not found: {process_model_identifier}.",
) )
) )
spec_files = SpecFileService.get_files(process_model_info) spec_files = SpecFileService.get_files(process_model_info)
@ -369,12 +368,11 @@ class ProcessInstanceProcessor:
@classmethod @classmethod
def get_bpmn_process_instance_from_process_model( def get_bpmn_process_instance_from_process_model(
cls, process_model_identifier: str, process_group_identifier: str cls, process_model_identifier: str
) -> BpmnWorkflow: ) -> BpmnWorkflow:
"""Get_all_bpmn_process_identifiers_for_process_model.""" """Get_all_bpmn_process_identifiers_for_process_model."""
(bpmn_process_spec, subprocesses) = cls.get_process_model_and_subprocesses( (bpmn_process_spec, subprocesses) = cls.get_process_model_and_subprocesses(
process_model_identifier, process_model_identifier,
process_group_identifier,
) )
return cls.get_bpmn_process_instance_from_workflow_spec( return cls.get_bpmn_process_instance_from_workflow_spec(
bpmn_process_spec, subprocesses bpmn_process_spec, subprocesses
@ -676,7 +674,7 @@ class ProcessInstanceProcessor:
etree_element, etree_element,
) )
return FileSystemService.full_path_to_process_model_file( return FileSystemService.full_path_to_process_model_file(
process_model, process_model.primary_file_name process_model
) )
return None return None
@ -685,6 +683,7 @@ class ProcessInstanceProcessor:
bpmn_process_identifier: str, bpmn_process_identifier: str,
) -> str: ) -> str:
"""Bpmn_file_full_path_from_bpmn_process_identifier.""" """Bpmn_file_full_path_from_bpmn_process_identifier."""
db.session.flush()
bpmn_process_id_lookup = BpmnProcessIdLookup.query.filter_by( bpmn_process_id_lookup = BpmnProcessIdLookup.query.filter_by(
bpmn_process_identifier=bpmn_process_identifier bpmn_process_identifier=bpmn_process_identifier
).first() ).first()

View File

@ -36,7 +36,6 @@ class ProcessInstanceService:
def create_process_instance( def create_process_instance(
process_model_identifier: str, process_model_identifier: str,
user: UserModel, user: UserModel,
process_group_identifier: Optional[str] = None,
) -> ProcessInstanceModel: ) -> ProcessInstanceModel:
"""Get_process_instance_from_spec.""" """Get_process_instance_from_spec."""
current_git_revision = GitService.get_current_revision() current_git_revision = GitService.get_current_revision()
@ -44,7 +43,7 @@ class ProcessInstanceService:
status=ProcessInstanceStatus.not_started.value, status=ProcessInstanceStatus.not_started.value,
process_initiator=user, process_initiator=user,
process_model_identifier=process_model_identifier, process_model_identifier=process_model_identifier,
process_group_identifier=process_group_identifier, process_group_identifier="",
start_in_seconds=round(time.time()), start_in_seconds=round(time.time()),
bpmn_version_control_type="git", bpmn_version_control_type="git",
bpmn_version_control_identifier=current_git_revision, bpmn_version_control_identifier=current_git_revision,
@ -101,7 +100,7 @@ class ProcessInstanceService:
next_task=None, next_task=None,
# navigation=navigation, # navigation=navigation,
process_model_identifier=processor.process_model_identifier, process_model_identifier=processor.process_model_identifier,
process_group_identifier=processor.process_group_identifier, process_group_identifier="",
# total_tasks=len(navigation), # total_tasks=len(navigation),
completed_tasks=processor.process_instance_model.completed_tasks, completed_tasks=processor.process_instance_model.completed_tasks,
updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds, updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds,

View File

@ -74,7 +74,7 @@ class ProcessModelService(FileSystemService):
def save_process_model(self, process_model: ProcessModelInfo) -> None: def save_process_model(self, process_model: ProcessModelInfo) -> None:
"""Save_process_model.""" """Save_process_model."""
spec_path = self.workflow_path(process_model) spec_path = os.path.join(FileSystemService.root_path(), process_model.id)
os.makedirs(spec_path, exist_ok=True) os.makedirs(spec_path, exist_ok=True)
json_path = os.path.join(spec_path, self.WF_JSON_FILE) json_path = os.path.join(spec_path, self.WF_JSON_FILE)
with open(json_path, "w") as wf_json: with open(json_path, "w") as wf_json:
@ -93,7 +93,8 @@ class ProcessModelService(FileSystemService):
message=f"We cannot delete the model `{process_model_id}`, there are existing instances that depend on it.", message=f"We cannot delete the model `{process_model_id}`, there are existing instances that depend on it.",
) )
process_model = self.get_process_model(process_model_id) process_model = self.get_process_model(process_model_id)
path = self.workflow_path(process_model) # path = self.workflow_path(process_model)
path = f"{FileSystemService.root_path()}/{process_model_id}"
shutil.rmtree(path) shutil.rmtree(path)
@classmethod @classmethod
@ -101,36 +102,43 @@ class ProcessModelService(FileSystemService):
cls, relative_path: str cls, relative_path: str
) -> ProcessModelInfo: ) -> ProcessModelInfo:
"""Get_process_model_from_relative_path.""" """Get_process_model_from_relative_path."""
process_group_identifier = os.path.dirname(relative_path) process_group_identifier, _ = os.path.split(relative_path)
process_group = cls().get_process_group(process_group_identifier) process_group = cls().get_process_group(process_group_identifier)
path = os.path.join(FileSystemService.root_path(), relative_path) path = os.path.join(FileSystemService.root_path(), relative_path)
return cls().__scan_spec(path, process_group=process_group) return cls().__scan_spec(path, process_group=process_group)
def get_process_model( def get_process_model(
self, process_model_id: str, group_id: Optional[str] = None self, process_model_id: str
) -> ProcessModelInfo: ) -> ProcessModelInfo:
"""Get a process model from a model and group id.""" """Get a process model from a model and group id.
process_model_id is the full path to the model--including groups"""
if not os.path.exists(FileSystemService.root_path()): if not os.path.exists(FileSystemService.root_path()):
raise ProcessEntityNotFoundError("process_model_not_found") raise ProcessEntityNotFoundError("process_model_root_not_found")
if group_id is not None: model_path = os.path.join(FileSystemService.root_path(), process_model_id)
process_group = self.get_process_group(group_id) if self.is_model(model_path):
if process_group is not None: process_model = self.get_process_model_from_relative_path(process_model_id)
for process_model in process_group.process_models: return process_model
if process_model_id == process_model.id:
return process_model # group_path, model_id = os.path.split(process_model_id)
with os.scandir(FileSystemService.root_path()) as process_group_dirs: # if group_path is not None:
for item in process_group_dirs: # process_group = self.get_process_group(group_path)
process_group_dir = item # if process_group is not None:
if item.is_dir(): # for process_model in process_group.process_models:
with os.scandir(item.path) as spec_dirs: # if process_model_id == process_model.id:
for sd in spec_dirs: # return process_model
if sd.name == process_model_id: # with os.scandir(FileSystemService.root_path()) as process_group_dirs:
# Now we have the process_group direcotry, and spec directory # for item in process_group_dirs:
process_group = self.__scan_process_group( # process_group_dir = item
process_group_dir # if item.is_dir():
) # with os.scandir(item.path) as spec_dirs:
return self.__scan_spec(sd.path, sd.name, process_group) # for sd in spec_dirs:
# if sd.name == process_model_id:
# # Now we have the process_group directory, and spec directory
# process_group = self.__scan_process_group(
# process_group_dir
# )
# return self.__scan_spec(sd.path, sd.name, process_group)
raise ProcessEntityNotFoundError("process_model_not_found") raise ProcessEntityNotFoundError("process_model_not_found")
def get_process_models( def get_process_models(
@ -162,22 +170,22 @@ class ProcessModelService(FileSystemService):
if os.path.exists(FileSystemService.root_path()): if os.path.exists(FileSystemService.root_path()):
process_group_path = os.path.join(FileSystemService.root_path(), process_group_id) process_group_path = os.path.join(FileSystemService.root_path(), process_group_id)
if self.is_group(process_group_path): if self.is_group(process_group_path):
nested_groups = [] return self.__scan_process_group(process_group_path)
process_group_dir = os.scandir(process_group_path) # nested_groups = []
for item in process_group_dir: # process_group_dir = os.scandir(process_group_path)
if self.is_group(item.path): # for item in process_group_dir:
nested_group = self.get_process_group(os.path.join(process_group_path, item.path)) # if self.is_group(item.path):
nested_groups.append(nested_group) # nested_group = self.get_process_group(os.path.join(process_group_path, item.path))
elif self.is_model(item.path): # nested_groups.append(nested_group)
print("get_process_group: ") # elif self.is_model(item.path):
return self.__scan_process_group(process_group_path) # print("get_process_group: ")
# return self.__scan_process_group(process_group_path)
# with os.scandir(FileSystemService.root_path()) as directory_items: # with os.scandir(FileSystemService.root_path()) as directory_items:
# for item in directory_items: # for item in directory_items:
# if item.is_dir() and item.name == process_group_id: # if item.is_dir() and item.name == process_group_id:
# return self.__scan_process_group(item) # return self.__scan_process_group(item)
else: raise ProcessEntityNotFoundError(
raise ProcessEntityNotFoundError(
"process_group_not_found", f"Process Group Id: {process_group_id}" "process_group_not_found", f"Process Group Id: {process_group_id}"
) )
@ -228,13 +236,13 @@ class ProcessModelService(FileSystemService):
for item in directory_items: for item in directory_items:
# if item.is_dir() and not item.name[0] == ".": # if item.is_dir() and not item.name[0] == ".":
if item.is_dir() and self.is_group(item): if item.is_dir() and self.is_group(item):
scanned_process_group = self.__scan_process_group(item) scanned_process_group = self.__scan_process_group(item.path)
process_groups.append(scanned_process_group) process_groups.append(scanned_process_group)
return process_groups return process_groups
def __scan_process_group(self, dir_item: os.DirEntry) -> ProcessGroup: def __scan_process_group(self, dir_path: str) -> ProcessGroup:
"""Reads the process_group.json file, and any nested directories.""" """Reads the process_group.json file, and any nested directories."""
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE) cat_path = os.path.join(dir_path, self.CAT_JSON_FILE)
if os.path.exists(cat_path): if os.path.exists(cat_path):
with open(cat_path) as cat_json: with open(cat_path) as cat_json:
data = json.load(cat_json) data = json.load(cat_json)
@ -242,18 +250,19 @@ class ProcessModelService(FileSystemService):
if process_group is None: if process_group is None:
raise ApiError( raise ApiError(
error_code="process_group_could_not_be_loaded_from_disk", error_code="process_group_could_not_be_loaded_from_disk",
message=f"We could not load the process_group from disk from: {dir_item}", message=f"We could not load the process_group from disk from: {dir_path}",
) )
else: else:
process_group_id = dir_path.replace(FileSystemService.root_path(), '')
process_group = ProcessGroup( process_group = ProcessGroup(
id=dir_item.name, id=process_group_id,
display_name=dir_item.name, display_name=process_group_id,
display_order=10000, display_order=10000,
admin=False, admin=False,
) )
with open(cat_path, "w") as wf_json: with open(cat_path, "w") as wf_json:
json.dump(self.GROUP_SCHEMA.dump(process_group), wf_json, indent=4) json.dump(self.GROUP_SCHEMA.dump(process_group), wf_json, indent=4)
with os.scandir(dir_item.path) as nested_items: with os.scandir(dir_path) as nested_items:
process_group.process_models = [] process_group.process_models = []
for nested_item in nested_items: for nested_item in nested_items:
if nested_item.is_dir(): if nested_item.is_dir():
@ -282,6 +291,8 @@ class ProcessModelService(FileSystemService):
if os.path.exists(spec_path): if os.path.exists(spec_path):
with open(spec_path) as wf_json: with open(spec_path) as wf_json:
data = json.load(wf_json) data = json.load(wf_json)
if "process_group_id" in data:
data.pop("process_group_id")
spec = ProcessModelInfo(**data) spec = ProcessModelInfo(**data)
if spec is None: if spec is None:
raise ApiError( raise ApiError(

View File

@ -42,7 +42,8 @@ class SpecFileService(FileSystemService):
extension_filter: str = "", extension_filter: str = "",
) -> List[File]: ) -> List[File]:
"""Return all files associated with a workflow specification.""" """Return all files associated with a workflow specification."""
path = SpecFileService.workflow_path(process_model_info) # path = SpecFileService.workflow_path(process_model_info)
path = os.path.join(FileSystemService.root_path(), process_model_info.id)
files = SpecFileService._get_files(path, file_name) files = SpecFileService._get_files(path, file_name)
if extension_filter != "": if extension_filter != "":
files = list( files = list(
@ -64,7 +65,8 @@ class SpecFileService(FileSystemService):
) -> File: ) -> File:
"""Update_file.""" """Update_file."""
SpecFileService.assert_valid_file_name(file_name) SpecFileService.assert_valid_file_name(file_name)
file_path = SpecFileService.file_path(process_model_info, file_name) # file_path = SpecFileService.file_path(process_model_info, file_name)
file_path = os.path.join(FileSystemService.root_path(), process_model_info.id, file_name)
SpecFileService.write_file_data_to_system(file_path, binary_data) SpecFileService.write_file_data_to_system(file_path, binary_data)
file = SpecFileService.to_file_object(file_name, file_path) file = SpecFileService.to_file_object(file_name, file_path)
@ -88,7 +90,8 @@ class SpecFileService(FileSystemService):
@staticmethod @staticmethod
def get_data(process_model_info: ProcessModelInfo, file_name: str) -> bytes: def get_data(process_model_info: ProcessModelInfo, file_name: str) -> bytes:
"""Get_data.""" """Get_data."""
file_path = SpecFileService.file_path(process_model_info, file_name) # file_path = SpecFileService.file_path(process_model_info, file_name)
file_path = os.path.join(FileSystemService.root_path(), process_model_info.id, file_name)
if not os.path.exists(file_path): if not os.path.exists(file_path):
raise ApiError( raise ApiError(
"unknown_file", "unknown_file",
@ -123,7 +126,8 @@ class SpecFileService(FileSystemService):
# for lf in lookup_files: # for lf in lookup_files:
# session.query(LookupDataModel).filter_by(lookup_file_model_id=lf.id).delete() # session.query(LookupDataModel).filter_by(lookup_file_model_id=lf.id).delete()
# session.query(LookupFileModel).filter_by(id=lf.id).delete() # session.query(LookupFileModel).filter_by(id=lf.id).delete()
file_path = SpecFileService.file_path(spec, file_name) # file_path = SpecFileService.file_path(spec, file_name)
file_path = os.path.join(FileSystemService.root_path(), spec.id, file_name)
os.remove(file_path) os.remove(file_path)
@staticmethod @staticmethod
@ -327,9 +331,8 @@ class SpecFileService(FileSystemService):
process_model_info: ProcessModelInfo, bpmn_file_name: str, et_root: _Element process_model_info: ProcessModelInfo, bpmn_file_name: str, et_root: _Element
) -> None: ) -> None:
"""Store_bpmn_process_identifiers.""" """Store_bpmn_process_identifiers."""
relative_process_model_path = SpecFileService.process_model_relative_path( relative_process_model_path = process_model_info.id
process_model_info
)
relative_bpmn_file_path = os.path.join( relative_bpmn_file_path = os.path.join(
relative_process_model_path, bpmn_file_name relative_process_model_path, bpmn_file_name
) )
@ -425,7 +428,7 @@ class SpecFileService(FileSystemService):
message_triggerable_process_model = MessageTriggerableProcessModel( message_triggerable_process_model = MessageTriggerableProcessModel(
message_model_id=message_model.id, message_model_id=message_model.id,
process_model_identifier=process_model_info.id, process_model_identifier=process_model_info.id,
process_group_identifier=process_model_info.process_group_id, process_group_identifier="process_group_identifier",
) )
db.session.add(message_triggerable_process_model) db.session.add(message_triggerable_process_model)
db.session.commit() db.session.commit()
@ -433,8 +436,8 @@ class SpecFileService(FileSystemService):
if ( if (
message_triggerable_process_model.process_model_identifier message_triggerable_process_model.process_model_identifier
!= process_model_info.id != process_model_info.id
or message_triggerable_process_model.process_group_identifier # or message_triggerable_process_model.process_group_identifier
!= process_model_info.process_group_id # != process_model_info.process_group_id
): ):
raise ValidationException( raise ValidationException(
"Message model is already used to start process model" "Message model is already used to start process model"

View File

@ -12,6 +12,10 @@ from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from flask_bpmn.api.api_error import ApiError from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from werkzeug.test import TestResponse # type: ignore from werkzeug.test import TestResponse # type: ignore
@ -34,6 +38,43 @@ from spiffworkflow_backend.services.user_service import UserService
class BaseTest: class BaseTest:
"""BaseTest.""" """BaseTest."""
def basic_test_setup(
self,
client: FlaskClient,
user: UserModel,
process_group_id: Optional[str] = "test_group",
process_model_id: Optional[str] = "random_fact",
bpmn_file_name: Optional[str] = None,
bpmn_file_location: Optional[str] = None
) -> str:
"""Creates a process group
Creates a process model
Adds a bpmn file to the model"""
process_group_display_name = process_group_id
process_group_description = process_group_id
process_model_identifier = f"{process_group_id}/{process_model_id}"
if bpmn_file_location is None:
bpmn_file_location = process_model_id
self.create_process_group(client, user, process_group_id, process_group_display_name)
self.create_process_model_with_api(
client,
process_model_id=process_model_identifier,
process_model_display_name=process_group_display_name,
process_model_description=process_group_description,
user=user,
)
load_test_spec(
process_model_id=process_model_identifier,
bpmn_file_name=bpmn_file_name,
process_model_source_directory=bpmn_file_location
)
return process_model_identifier
@staticmethod @staticmethod
def find_or_create_user(username: str = "test_user_1") -> UserModel: def find_or_create_user(username: str = "test_user_1") -> UserModel:
"""Find_or_create_user.""" """Find_or_create_user."""
@ -67,17 +108,18 @@ class BaseTest:
open_id_client_secret_key, open_id_client_secret_key,
) )
@staticmethod
def create_process_instance( def create_process_instance(
self,
client: FlaskClient, client: FlaskClient,
test_process_group_id: str,
test_process_model_id: str, test_process_model_id: str,
headers: Dict[str, str], headers: Dict[str, str],
) -> TestResponse: ) -> TestResponse:
"""Create_process_instance.""" """Create_process_instance.
load_test_spec(test_process_model_id, process_group_id=test_process_group_id) There must be an existing process model to instantiate."""
modified_process_model_id = test_process_model_id.replace("/", ":")
response = client.post( response = client.post(
f"/v1.0/process-models/{test_process_group_id}/{test_process_model_id}/process-instances", f"/v1.0/process-models/{modified_process_model_id}/process-instances",
headers=headers, headers=headers,
) )
assert response.status_code == 201 assert response.status_code == 201
@ -86,8 +128,7 @@ class BaseTest:
def create_process_model_with_api( def create_process_model_with_api(
self, self,
client: FlaskClient, client: FlaskClient,
process_group_id: Optional[str] = None, process_model_id: Optional[str] = None,
process_model_id: str = "make_cookies",
process_model_display_name: str = "Cooooookies", process_model_display_name: str = "Cooooookies",
process_model_description: str = "Om nom nom delicious cookies", process_model_description: str = "Om nom nom delicious cookies",
fault_or_suspend_on_exception: str = NotificationType.suspend.value, fault_or_suspend_on_exception: str = NotificationType.suspend.value,
@ -97,65 +138,74 @@ class BaseTest:
user: Optional[UserModel] = None, user: Optional[UserModel] = None,
) -> TestResponse: ) -> TestResponse:
"""Create_process_model.""" """Create_process_model."""
process_model_service = ProcessModelService()
# make sure we have a group if process_model_id is not None:
if process_group_id is None:
process_group_tmp = ProcessGroup( # make sure we have a group
id="test_cat", process_group_id, _ = os.path.split(process_model_id)
display_name="Test Category", process_group_path = f"{FileSystemService.root_path()}/{process_group_id}"
display_order=0, if ProcessModelService().is_group(process_group_path):
admin=False,
) if exception_notification_addresses is None:
process_group = process_model_service.add_process_group(process_group_tmp) exception_notification_addresses = []
model = ProcessModelInfo(
id=process_model_id,
display_name=process_model_display_name,
description=process_model_description,
is_review=False,
primary_process_id=primary_process_id,
primary_file_name=primary_file_name,
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
exception_notification_addresses=exception_notification_addresses,
)
if user is None:
user = self.find_or_create_user()
response = client.post(
"/v1.0/process-models",
content_type="application/json",
data=json.dumps(ProcessModelInfoSchema().dump(model)),
headers=self.logged_in_headers(user),
)
assert response.status_code == 201
return response
else:
raise Exception("You must create the group first")
else: else:
process_group = ProcessModelService().get_process_group(process_group_id) raise Exception("You must include the process_model_id, which must be a path to the model")
if exception_notification_addresses is None:
exception_notification_addresses = []
model = ProcessModelInfo(
id=process_model_id,
display_name=process_model_display_name,
description=process_model_description,
process_group_id=process_group.id,
is_review=False,
primary_process_id=primary_process_id,
primary_file_name=primary_file_name,
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
exception_notification_addresses=exception_notification_addresses,
)
if user is None:
user = self.find_or_create_user()
response = client.post(
"/v1.0/process-models",
content_type="application/json",
data=json.dumps(ProcessModelInfoSchema().dump(model)),
headers=self.logged_in_headers(user),
)
assert response.status_code == 201
return response
def create_spec_file( def create_spec_file(
self, self,
client: FlaskClient, client: FlaskClient,
process_group_id: str = "random_fact", process_model_id: str,
process_model_id: str = "random_fact", process_model_location: Optional[str] = None,
process_model: Optional[ProcessModelInfo] = None, process_model: Optional[ProcessModelInfo] = None,
file_name: str = "random_fact.svg", file_name: str = "random_fact.svg",
file_data: bytes = b"abcdef", file_data: bytes = b"abcdef",
user: Optional[UserModel] = None, user: Optional[UserModel] = None,
) -> Any: ) -> Any:
"""Test_create_spec_file.""" """Test_create_spec_file.
Adds a bpmn file to the model.
process_model_id is the destination path
process_model_location is the source path
because of permissions, user might be required now..., not sure yet"""
if process_model_location is None:
process_model_location = file_name.split(".")[0]
if process_model is None: if process_model is None:
process_model = load_test_spec( process_model = load_test_spec(
process_model_id, process_group_id=process_group_id process_model_id=process_model_id,
bpmn_file_name=file_name,
process_model_source_directory=process_model_location
) )
data = {"file": (io.BytesIO(file_data), file_name)} data = {"file": (io.BytesIO(file_data), file_name)}
if user is None: if user is None:
user = self.find_or_create_user() user = self.find_or_create_user()
modified_process_model_id = process_model.id.replace("/", ":")
response = client.post( response = client.post(
f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}/files", f"/v1.0/process-models/{modified_process_model_id}/files",
data=data, data=data,
follow_redirects=True, follow_redirects=True,
content_type="multipart/form-data", content_type="multipart/form-data",
@ -168,7 +218,7 @@ class BaseTest:
# assert "image/svg+xml" == file["content_type"] # assert "image/svg+xml" == file["content_type"]
response = client.get( response = client.get(
f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}/files/{file_name}", f"/v1.0/process-models/{modified_process_model_id}/files/{file_name}",
headers=self.logged_in_headers(user), headers=self.logged_in_headers(user),
) )
assert response.status_code == 200 assert response.status_code == 200
@ -221,7 +271,7 @@ class BaseTest:
status=status, status=status,
process_initiator=user, process_initiator=user,
process_model_identifier=process_model.id, process_model_identifier=process_model.id,
process_group_identifier=process_model.process_group_id, process_group_identifier="",
updated_at_in_seconds=round(time.time()), updated_at_in_seconds=round(time.time()),
start_in_seconds=current_time - (3600 * 1), start_in_seconds=current_time - (3600 * 1),
end_in_seconds=current_time - (3600 * 1 - 20), end_in_seconds=current_time - (3600 * 1 - 20),

View File

@ -13,27 +13,30 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService
class ExampleDataLoader: class ExampleDataLoader:
"""ExampleDataLoader.""" """ExampleDataLoader."""
@staticmethod
def create_spec( def create_spec(
self,
process_model_id: str, process_model_id: str,
display_name: str = "", display_name: str = "",
description: str = "", description: str = "",
process_group_id: str = "",
display_order: int = 0, display_order: int = 0,
from_tests: bool = False, # from_tests: bool = False,
bpmn_file_name: Optional[str] = None, bpmn_file_name: Optional[str] = None,
process_model_source_directory: Optional[str] = None, process_model_source_directory: str = None,
) -> ProcessModelInfo: ) -> ProcessModelInfo:
"""Assumes that a directory exists in static/bpmn with the same name as the given process_model_id. """Assumes that process_model_source_directory exists in static/bpmn and contains bpmn_file_name.
further assumes that the [process_model_id].bpmn is the primary file for the process model. further assumes that bpmn_file_name is the primary file for the process model.
returns an array of data models to be added to the database.
if bpmn_file_name is None we load all files in process_model_source_directory,
otherwise, we only load bpmn_file_name
""" """
if process_model_source_directory is None:
raise Exception("You must include `process_model_source_directory`.")
spec = ProcessModelInfo( spec = ProcessModelInfo(
id=process_model_id, id=process_model_id,
display_name=display_name, display_name=display_name,
description=description, description=description,
process_group_id=process_group_id,
display_order=display_order, display_order=display_order,
is_review=False, is_review=False,
) )
@ -55,25 +58,16 @@ class ExampleDataLoader:
if bpmn_file_name: if bpmn_file_name:
file_name_matcher = bpmn_file_name_with_extension file_name_matcher = bpmn_file_name_with_extension
file_glob = "" # file_glob = ""
if from_tests: file_glob = os.path.join(
file_glob = os.path.join( current_app.root_path,
current_app.instance_path, "..",
"..", "..",
"..", "tests",
"tests", "data",
"data", process_model_source_directory_to_use,
process_model_source_directory_to_use, file_name_matcher,
file_name_matcher, )
)
else:
file_glob = os.path.join(
current_app.root_path,
"static",
"bpmn",
process_model_source_directory_to_use,
file_name_matcher,
)
files = glob.glob(file_glob) files = glob.glob(file_glob)
for file_path in files: for file_path in files:

View File

@ -37,40 +37,18 @@ def assure_process_group_exists(process_group_id: Optional[str] = None) -> Proce
def load_test_spec( def load_test_spec(
process_model_id: str, process_model_id: str,
process_group_id: Optional[str] = None,
bpmn_file_name: Optional[str] = None, bpmn_file_name: Optional[str] = None,
process_model_source_directory: Optional[str] = None, process_model_source_directory: str = None,
) -> ProcessModelInfo: ) -> ProcessModelInfo:
"""Loads a process model into the bpmn dir based on a directory in tests/data.""" """Loads a bpmn file into the process model dir based on a directory in tests/data."""
process_group = None
process_model_service = ProcessModelService()
if process_group_id is None:
process_group_id = "test_process_group_id"
process_group = assure_process_group_exists(process_group_id)
process_group_id = process_group.id
try: if process_model_source_directory is None:
return process_model_service.get_process_model( raise Exception("You must inclode a `process_model_source_directory`.")
process_model_id, group_id=process_group_id
)
except ProcessEntityNotFoundError:
spec = ExampleDataLoader().create_spec(
process_model_id=process_model_id,
from_tests=True,
display_name=process_model_id,
process_group_id=process_group_id,
bpmn_file_name=bpmn_file_name,
process_model_source_directory=process_model_source_directory,
)
return spec
spec = ExampleDataLoader.create_spec(
# def user_info_to_query_string(user_info, redirect_url): process_model_id=process_model_id,
# query_string_list = [] display_name=process_model_id,
# items = user_info.items() bpmn_file_name=bpmn_file_name,
# for key, value in items: process_model_source_directory=process_model_source_directory,
# query_string_list.append('%s=%s' % (key, urllib.parse.quote(value))) )
# return spec
# query_string_list.append('redirect_url=%s' % redirect_url)
#
# return '?%s' % '&'.join(query_string_list)

View File

@ -19,20 +19,43 @@ class TestLoggingService(BaseTest):
"""Test_process_instance_run.""" """Test_process_instance_run."""
process_group_id = "test_logging_spiff_logger" process_group_id = "test_logging_spiff_logger"
process_model_id = "simple_script" process_model_id = "simple_script"
self.create_process_group(client=client, user=with_super_admin_user, process_group_id=process_group_id)
process_model_identifier = f"{process_group_id}/{process_model_id}"
# create the model
process_model_info = self.create_process_model_with_api(
client=client,
process_model_id=process_model_identifier,
process_model_display_name="Simple Script",
process_model_description="Simple Script",
user=with_super_admin_user
)
bpmn_file_name = "simple_script.bpmn"
bpmn_file_data_bytes = self.get_test_data_file_contents(
bpmn_file_name, "simple_script"
)
# add bpmn to the model
self.create_spec_file(
client=client,
process_model_id=process_model_identifier,
file_name=bpmn_file_name,
file_data=bpmn_file_data_bytes,
user=with_super_admin_user
)
headers = self.logged_in_headers(with_super_admin_user) headers = self.logged_in_headers(with_super_admin_user)
response = self.create_process_instance( response = self.create_process_instance(
client, process_group_id, process_model_id, headers client, process_model_identifier, headers
) )
assert response.json is not None assert response.json is not None
process_instance_id = response.json["id"] process_instance_id = response.json["id"]
response = client.post( response = client.post(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run", f"/v1.0/process-instances/{process_instance_id}/run",
headers=headers, headers=headers,
) )
assert response.status_code == 200 assert response.status_code == 200
log_response = client.get( log_response = client.get(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/logs", f"/v1.0/process-instances/{process_instance_id}/logs",
headers=headers, headers=headers,
) )
assert log_response.status_code == 200 assert log_response.status_code == 200

View File

@ -17,7 +17,7 @@ class TestNestedGroups(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
# /process-groups/{process_group_path}/show # /process-groups/{process_group_path}/show
target_uri = "/v1.0/process-groups/group_a,group_b/show" target_uri = "/v1.0/process-groups/group_a,group_b"
user = self.find_or_create_user() user = self.find_or_create_user()
self.add_permissions_to_user( self.add_permissions_to_user(
user, target_uri=target_uri, permission_names=["read"] user, target_uri=target_uri, permission_names=["read"]
@ -117,7 +117,6 @@ class TestNestedGroups(BaseTest):
id="process_model", id="process_model",
display_name="Process Model", display_name="Process Model",
description="Process Model", description="Process Model",
process_group_id="group_a/group_b",
primary_file_name="primary_file.bpmn", primary_file_name="primary_file.bpmn",
primary_process_id="primary_process_id", primary_process_id="primary_process_id",
display_order=0 display_order=0
@ -129,3 +128,45 @@ class TestNestedGroups(BaseTest):
data=json.dumps(ProcessModelInfoSchema().dump(process_model)) data=json.dumps(ProcessModelInfoSchema().dump(process_model))
) )
print("test_process_model_add") print("test_process_model_add")
def test_process_group_show(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
# target_uri = "/process-groups/{process_group_id}"
# user = self.find_or_create_user("testadmin1")
# self.add_permissions_to_user(
# user, target_uri="v1.0/process-groups", permission_names=["read", "create"]
# )
# self.add_permissions_to_user(
# user, target_uri="/process-groups/{process_group_id}", permission_names=["read", "create"]
# )
process_group_a = ProcessGroup(
id="group_a",
display_name="Group A",
display_order=0,
admin=False,
)
response_create_a = client.post(
"/v1.0/process-groups",
headers=self.logged_in_headers(with_super_admin_user),
content_type="application/json",
data=json.dumps(ProcessGroupSchema().dump(process_group_a)),
)
target_uri = "/v1.0/process-groups/group_a"
user = self.find_or_create_user()
self.add_permissions_to_user(
user, target_uri=target_uri, permission_names=["read"]
)
response = client.get(
target_uri,
headers=self.logged_in_headers(user)
)
print("test_process_group_show: ")

View File

@ -42,16 +42,16 @@ class SecretServiceTestHelpers(BaseTest):
self.test_process_group_id, self.test_process_group_id,
display_name=self.test_process_group_display_name, display_name=self.test_process_group_display_name,
) )
process_model_identifier = f"{self.test_process_group_id}/{self.test_process_model_id}"
self.create_process_model_with_api( self.create_process_model_with_api(
client, client,
process_group_id=self.test_process_group_id, process_model_id=process_model_identifier,
process_model_id=self.test_process_model_id,
process_model_display_name=self.test_process_model_display_name, process_model_display_name=self.test_process_model_display_name,
process_model_description=self.test_process_model_description, process_model_description=self.test_process_model_description,
user=user, user=user,
) )
process_model_info = ProcessModelService().get_process_model( process_model_info = ProcessModelService().get_process_model(
self.test_process_model_id, self.test_process_group_id process_model_identifier
) )
return process_model_info return process_model_info

View File

@ -29,8 +29,14 @@ class TestGetLocaltime(BaseTest):
) -> None: ) -> None:
"""Test_process_instance_run.""" """Test_process_instance_run."""
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
self.add_permissions_to_user(
initiator_user, target_uri="/v1.0/process-groups", permission_names=["read", "create"]
)
self.create_process_group(client=client, user=initiator_user, process_group_id="test_group")
process_model = load_test_spec( process_model = load_test_spec(
process_model_id="get_localtime", bpmn_file_name="get_localtime.bpmn" process_model_id="test_group/get_localtime",
bpmn_file_name="get_localtime.bpmn",
process_model_source_directory="get_localtime"
) )
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user process_model=process_model, user=initiator_user

View File

@ -1,10 +1,13 @@
"""Test_message_service.""" """Test_message_service."""
import pytest import pytest
from flask import Flask from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserNotFoundError from spiffworkflow_backend.models.user import UserModel, UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
@ -89,7 +92,11 @@ class TestAuthorizationService(BaseTest):
) )
def test_user_can_be_added_to_active_task_on_first_login( def test_user_can_be_added_to_active_task_on_first_login(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_user_can_be_added_to_active_task_on_first_login.""" """Test_user_can_be_added_to_active_task_on_first_login."""
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
@ -98,8 +105,17 @@ class TestAuthorizationService(BaseTest):
self.find_or_create_user("testuser1") self.find_or_create_user("testuser1")
AuthorizationService.import_permissions_from_yaml_file() AuthorizationService.import_permissions_from_yaml_file()
process_model = load_test_spec( process_model_identifier = self.basic_test_setup(
process_model_id="model_with_lanes", bpmn_file_name="lanes.bpmn" client=client,
user=with_super_admin_user,
process_group_id="test_group",
process_model_id="model_with_lanes",
bpmn_file_name="lanes.bpmn",
bpmn_file_location="model_with_lanes"
)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
) )
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user process_model=process_model, user=initiator_user

View File

@ -1,8 +1,12 @@
"""Test_various_bpmn_constructs.""" """Test_various_bpmn_constructs."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
@ -15,21 +19,30 @@ class TestDotNotation(BaseTest):
"""TestVariousBpmnConstructs.""" """TestVariousBpmnConstructs."""
def test_dot_notation( def test_dot_notation(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_form_data_conversion_to_dot_dict.""" """Test_form_data_conversion_to_dot_dict."""
process_model = load_test_spec( process_group_id = "dot_notation_group"
"test_dot_notation", process_model_id = "test_dot_notation"
bpmn_file_name="diagram.bpmn", bpmn_file_name = "diagram.bpmn"
process_model_source_directory="dot_notation", bpmn_file_location = "dot_notation"
process_model_identifier = self.basic_test_setup(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location
) )
current_user = self.find_or_create_user()
process_instance = self.create_process_instance_from_process_model( headers = self.logged_in_headers(with_super_admin_user)
process_model response = self.create_process_instance(
client, process_model_identifier, headers
) )
process_instance_id = response.json["id"]
process_instance = ProcessInstanceService().get_process_instance(process_instance_id)
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
user_task = processor.get_ready_user_tasks()[0] user_task = processor.get_ready_user_tasks()[0]
@ -41,7 +54,7 @@ class TestDotNotation(BaseTest):
"invoice.dueDate": "09/30/2022", "invoice.dueDate": "09/30/2022",
} }
ProcessInstanceService.complete_form_task( ProcessInstanceService.complete_form_task(
processor, user_task, form_data, current_user processor, user_task, form_data, with_super_admin_user
) )
expected = { expected = {

View File

@ -1,24 +1,46 @@
"""Test_message_instance.""" """Test_message_instance."""
import pytest import pytest
from flask import Flask from flask import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.user import UserModel
class TestMessageInstance(BaseTest): class TestMessageInstance(BaseTest):
"""TestMessageInstance.""" """TestMessageInstance."""
def setup_message_tests(self, client: FlaskClient, user: UserModel) -> str:
process_group_id = "test_group"
process_model_id = "hello_world"
bpmn_file_name = "hello_world.bpmn"
bpmn_file_location = "hello_world"
process_model_identifier = self.basic_test_setup(
client,
user,
process_group_id=process_group_id,
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location
)
return process_model_identifier
def test_can_create_message_instance( def test_can_create_message_instance(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_create_message_instance.""" """Test_can_create_message_instance."""
message_model_identifier = "message_model_one" message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier) message_model = self.create_message_model(message_model_identifier)
process_model = load_test_spec("hello_world") process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
)
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model, "waiting" process_model, "waiting"
) )
@ -40,12 +62,16 @@ class TestMessageInstance(BaseTest):
assert queued_message_from_query is not None assert queued_message_from_query is not None
def test_cannot_set_invalid_status( def test_cannot_set_invalid_status(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_cannot_set_invalid_status.""" """Test_cannot_set_invalid_status."""
message_model_identifier = "message_model_one" message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier) message_model = self.create_message_model(message_model_identifier)
process_model = load_test_spec("hello_world") process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
)
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model, "waiting" process_model, "waiting"
) )
@ -76,12 +102,16 @@ class TestMessageInstance(BaseTest):
) )
def test_cannot_set_invalid_message_type( def test_cannot_set_invalid_message_type(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_cannot_set_invalid_message_type.""" """Test_cannot_set_invalid_message_type."""
message_model_identifier = "message_model_one" message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier) message_model = self.create_message_model(message_model_identifier)
process_model = load_test_spec("hello_world") process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
)
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model, "waiting" process_model, "waiting"
) )
@ -113,12 +143,16 @@ class TestMessageInstance(BaseTest):
) )
def test_force_failure_cause_if_status_is_failure( def test_force_failure_cause_if_status_is_failure(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_force_failure_cause_if_status_is_failure.""" """Test_force_failure_cause_if_status_is_failure."""
message_model_identifier = "message_model_one" message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier) message_model = self.create_message_model(message_model_identifier)
process_model = load_test_spec("hello_world") process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
)
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model, "waiting" process_model, "waiting"
) )
@ -154,7 +188,8 @@ class TestMessageInstance(BaseTest):
assert queued_message.id is not None assert queued_message.id is not None
assert queued_message.failure_cause == "THIS TEST FAILURE" assert queued_message.failure_cause == "THIS TEST FAILURE"
def create_message_model(self, message_model_identifier: str) -> MessageModel: @staticmethod
def create_message_model(message_model_identifier: str) -> MessageModel:
"""Create_message_model.""" """Create_message_model."""
message_model = MessageModel(identifier=message_model_identifier) message_model = MessageModel(identifier=message_model_identifier)
db.session.add(message_model) db.session.add(message_model)

View File

@ -1,5 +1,6 @@
"""Test_message_service.""" """Test_message_service."""
from flask import Flask from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -9,6 +10,7 @@ from spiffworkflow_backend.models.message_correlation_message_instance import (
) )
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
@ -22,25 +24,26 @@ class TestMessageService(BaseTest):
"""TestMessageService.""" """TestMessageService."""
def test_can_send_message_to_waiting_message( def test_can_send_message_to_waiting_message(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_send_message_to_waiting_message.""" """Test_can_send_message_to_waiting_message."""
process_model_sender = load_test_spec( process_group_id = "test_group"
"message_sender", self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender",
)
load_test_spec( load_test_spec(
"message_receiver", "test_group/message_receiver",
process_model_source_directory="message_send_one_conversation", process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver", bpmn_file_name="message_receiver.bpmn"
)
process_model_sender = load_test_spec(
"test_group/message_sender",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn",
) )
user = self.find_or_create_user()
process_instance_sender = ProcessInstanceService.create_process_instance( process_instance_sender = ProcessInstanceService.create_process_instance(
process_model_sender.id, process_model_sender.id,
user, with_super_admin_user,
process_group_identifier=process_model_sender.process_group_id,
) )
processor_sender = ProcessInstanceProcessor(process_instance_sender) processor_sender = ProcessInstanceProcessor(process_instance_sender)
@ -115,21 +118,25 @@ class TestMessageService(BaseTest):
assert process_instance.status == "complete" assert process_instance.status == "complete"
def test_can_send_message_to_multiple_process_models( def test_can_send_message_to_multiple_process_models(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_send_message_to_multiple_process_models.""" """Test_can_send_message_to_multiple_process_models."""
process_group_id = "test_group"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_sender = load_test_spec( process_model_sender = load_test_spec(
"message_sender", "test_group/message_sender",
process_model_source_directory="message_send_two_conversations", process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_sender", bpmn_file_name="message_sender",
) )
load_test_spec( load_test_spec(
"message_receiver_one", "test_group/message_receiver_one",
process_model_source_directory="message_send_two_conversations", process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_receiver_one", bpmn_file_name="message_receiver_one",
) )
load_test_spec( load_test_spec(
"message_receiver_two", "test_group/message_receiver_two",
process_model_source_directory="message_send_two_conversations", process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_receiver_two", bpmn_file_name="message_receiver_two",
) )
@ -139,7 +146,7 @@ class TestMessageService(BaseTest):
process_instance_sender = ProcessInstanceService.create_process_instance( process_instance_sender = ProcessInstanceService.create_process_instance(
process_model_sender.id, process_model_sender.id,
user, user,
process_group_identifier=process_model_sender.process_group_id, # process_group_identifier=process_model_sender.process_group_id,
) )
processor_sender = ProcessInstanceProcessor(process_instance_sender) processor_sender = ProcessInstanceProcessor(process_instance_sender)
@ -189,24 +196,24 @@ class TestMessageService(BaseTest):
assert len(process_instance_result) == 3 assert len(process_instance_result) == 3
process_instance_receiver_one = ProcessInstanceModel.query.filter_by( process_instance_receiver_one = ProcessInstanceModel.query.filter_by(
process_model_identifier="message_receiver_one" process_model_identifier="test_group/message_receiver_one"
).first() ).first()
assert process_instance_receiver_one is not None assert process_instance_receiver_one is not None
process_instance_receiver_two = ProcessInstanceModel.query.filter_by( process_instance_receiver_two = ProcessInstanceModel.query.filter_by(
process_model_identifier="message_receiver_two" process_model_identifier="test_group/message_receiver_two"
).first() ).first()
assert process_instance_receiver_two is not None assert process_instance_receiver_two is not None
# just make sure it's a different process instance # just make sure it's a different process instance
assert ( assert (
process_instance_receiver_one.process_model_identifier process_instance_receiver_one.process_model_identifier
== "message_receiver_one" == "test_group/message_receiver_one"
) )
assert process_instance_receiver_one.id != process_instance_sender.id assert process_instance_receiver_one.id != process_instance_sender.id
assert process_instance_receiver_one.status == "complete" assert process_instance_receiver_one.status == "complete"
assert ( assert (
process_instance_receiver_two.process_model_identifier process_instance_receiver_two.process_model_identifier
== "message_receiver_two" == "test_group/message_receiver_two"
) )
assert process_instance_receiver_two.id != process_instance_sender.id assert process_instance_receiver_two.id != process_instance_sender.id
assert process_instance_receiver_two.status == "complete" assert process_instance_receiver_two.status == "complete"

View File

@ -1,5 +1,6 @@
"""Test Permissions.""" """Test Permissions."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -8,6 +9,7 @@ from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.principal import PrincipalModel from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
@ -22,13 +24,15 @@ class TestPermissions(BaseTest):
"""TestPermissions.""" """TestPermissions."""
def test_user_can_be_given_permission_to_administer_process_group( def test_user_can_be_given_permission_to_administer_process_group(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_user_can_be_given_permission_to_administer_process_group.""" """Test_user_can_be_given_permission_to_administer_process_group."""
process_group_id = "group-a" process_group_id = "group-a"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
load_test_spec( load_test_spec(
"timers_intermediate_catch_event", "group-a/timers_intermediate_catch_event",
process_group_id=process_group_id, bpmn_file_name="timers_intermediate_catch_event.bpmn",
process_model_source_directory="timers_intermediate_catch_event"
) )
dan = self.find_or_create_user() dan = self.find_or_create_user()
principal = dan.principal principal = dan.principal
@ -55,8 +59,9 @@ class TestPermissions(BaseTest):
process_group_b_id = process_group_ids[1] process_group_b_id = process_group_ids[1]
for process_group_id in process_group_ids: for process_group_id in process_group_ids:
load_test_spec( load_test_spec(
"timers_intermediate_catch_event", f"{process_group_id}/timers_intermediate_catch_event",
process_group_id=process_group_id, bpmn_file_name="timers_intermediate_catch_event",
process_model_source_directory="timers_intermediate_catch_event"
) )
group_a_admin = self.find_or_create_user() group_a_admin = self.find_or_create_user()
@ -86,11 +91,11 @@ class TestPermissions(BaseTest):
"""Test_user_can_be_granted_access_through_a_group.""" """Test_user_can_be_granted_access_through_a_group."""
process_group_ids = ["group-a", "group-b"] process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0] process_group_a_id = process_group_ids[0]
process_group_ids[1]
for process_group_id in process_group_ids: for process_group_id in process_group_ids:
load_test_spec( load_test_spec(
"timers_intermediate_catch_event", f"{process_group_id}/timers_intermediate_catch_event",
process_group_id=process_group_id, bpmn_file_name="timers_intermediate_catch_event.bpmn",
process_model_source_directory="timers_intermediate_catch_event"
) )
user = self.find_or_create_user() user = self.find_or_create_user()
group = GroupModel(identifier="groupA") group = GroupModel(identifier="groupA")
@ -127,8 +132,9 @@ class TestPermissions(BaseTest):
process_group_b_id = process_group_ids[1] process_group_b_id = process_group_ids[1]
for process_group_id in process_group_ids: for process_group_id in process_group_ids:
load_test_spec( load_test_spec(
"timers_intermediate_catch_event", f"{process_group_id}/timers_intermediate_catch_event",
process_group_id=process_group_id, bpmn_file_name="timers_intermediate_catch_event.bpmn",
process_model_source_directory="timers_intermediate_catch_event"
) )
group_a_admin = self.find_or_create_user() group_a_admin = self.find_or_create_user()

View File

@ -2,11 +2,13 @@
import pytest import pytest
from flask import g from flask import g
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import ( from spiffworkflow_backend.services.authorization_service import (
UserDoesNotHaveAccessToTaskError, UserDoesNotHaveAccessToTaskError,
@ -50,9 +52,12 @@ class TestProcessInstanceProcessor(BaseTest):
def test_sets_permission_correctly_on_active_task( def test_sets_permission_correctly_on_active_task(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_sets_permission_correctly_on_active_task.""" """Test_sets_permission_correctly_on_active_task."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
finance_user = self.find_or_create_user("testuser2") finance_user = self.find_or_create_user("testuser2")
assert initiator_user.principal is not None assert initiator_user.principal is not None
@ -63,7 +68,9 @@ class TestProcessInstanceProcessor(BaseTest):
assert finance_group is not None assert finance_group is not None
process_model = load_test_spec( process_model = load_test_spec(
process_model_id="model_with_lanes", bpmn_file_name="lanes.bpmn" process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes.bpmn",
process_model_source_directory="model_with_lanes"
) )
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user process_model=process_model, user=initiator_user
@ -123,9 +130,12 @@ class TestProcessInstanceProcessor(BaseTest):
def test_sets_permission_correctly_on_active_task_when_using_dict( def test_sets_permission_correctly_on_active_task_when_using_dict(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_sets_permission_correctly_on_active_task_when_using_dict.""" """Test_sets_permission_correctly_on_active_task_when_using_dict."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user") initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3") finance_user_three = self.find_or_create_user("testuser3")
finance_user_four = self.find_or_create_user("testuser4") finance_user_four = self.find_or_create_user("testuser4")
@ -138,8 +148,9 @@ class TestProcessInstanceProcessor(BaseTest):
assert finance_group is not None assert finance_group is not None
process_model = load_test_spec( process_model = load_test_spec(
process_model_id="model_with_lanes", process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes_with_owner_dict.bpmn", bpmn_file_name="lanes_with_owner_dict.bpmn",
process_model_source_directory="model_with_lanes"
) )
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user process_model=process_model, user=initiator_user

View File

@ -1,11 +1,13 @@
"""Process Model.""" """Process Model."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.bpmn_process_id_lookup import BpmnProcessIdLookup from spiffworkflow_backend.models.bpmn_process_id_lookup import BpmnProcessIdLookup
from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
@ -22,11 +24,13 @@ class TestProcessModel(BaseTest):
assert process_model_one.files == [] assert process_model_one.files == []
def test_can_run_process_model_with_call_activities_when_in_same_process_model_directory( def test_can_run_process_model_with_call_activities_when_in_same_process_model_directory(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_run_process_model_with_call_activities.""" """Test_can_run_process_model_with_call_activities."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec( process_model = load_test_spec(
"call_activity_test", "test_group/call_activity_test",
# bpmn_file_name="call_activity_test.bpmn",
process_model_source_directory="call_activity_same_directory", process_model_source_directory="call_activity_same_directory",
) )
@ -38,11 +42,12 @@ class TestProcessModel(BaseTest):
assert process_instance.status == "complete" assert process_instance.status == "complete"
def test_can_run_process_model_with_call_activities_when_not_in_same_directory( def test_can_run_process_model_with_call_activities_when_not_in_same_directory(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_run_process_model_with_call_activities.""" """Test_can_run_process_model_with_call_activities."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec( process_model = load_test_spec(
"call_activity_nested", "test_group/call_activity_nested",
process_model_source_directory="call_activity_nested", process_model_source_directory="call_activity_nested",
bpmn_file_name="call_activity_nested", bpmn_file_name="call_activity_nested",
) )
@ -54,7 +59,7 @@ class TestProcessModel(BaseTest):
] ]
for bpmn_file_name in bpmn_file_names: for bpmn_file_name in bpmn_file_names:
load_test_spec( load_test_spec(
bpmn_file_name, f"test_group/{bpmn_file_name}",
process_model_source_directory="call_activity_nested", process_model_source_directory="call_activity_nested",
bpmn_file_name=bpmn_file_name, bpmn_file_name=bpmn_file_name,
) )
@ -66,11 +71,12 @@ class TestProcessModel(BaseTest):
assert process_instance.status == "complete" assert process_instance.status == "complete"
def test_can_run_process_model_with_call_activities_when_process_identifier_is_not_in_database( def test_can_run_process_model_with_call_activities_when_process_identifier_is_not_in_database(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_run_process_model_with_call_activities.""" """Test_can_run_process_model_with_call_activities."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec( process_model = load_test_spec(
"call_activity_nested", "test_group/call_activity_nested",
process_model_source_directory="call_activity_nested", process_model_source_directory="call_activity_nested",
bpmn_file_name="call_activity_nested", bpmn_file_name="call_activity_nested",
) )
@ -82,7 +88,7 @@ class TestProcessModel(BaseTest):
] ]
for bpmn_file_name in bpmn_file_names: for bpmn_file_name in bpmn_file_names:
load_test_spec( load_test_spec(
bpmn_file_name, f"test_group/{bpmn_file_name}",
process_model_source_directory="call_activity_nested", process_model_source_directory="call_activity_nested",
bpmn_file_name=bpmn_file_name, bpmn_file_name=bpmn_file_name,
) )
@ -93,6 +99,7 @@ class TestProcessModel(BaseTest):
# delete all of the id lookup items to force to processor to find the correct # delete all of the id lookup items to force to processor to find the correct
# process model when running the process # process model when running the process
db.session.query(BpmnProcessIdLookup).delete() db.session.query(BpmnProcessIdLookup).delete()
db.session.commit()
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
assert process_instance.status == "complete" assert process_instance.status == "complete"

View File

@ -1,8 +1,10 @@
"""Test_process_model_service.""" """Test_process_model_service."""
from flask import Flask from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -10,11 +12,16 @@ class TestProcessModelService(BaseTest):
"""TestProcessModelService.""" """TestProcessModelService."""
def test_can_update_specified_attributes( def test_can_update_specified_attributes(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_update_specified_attributes.""" """Test_can_update_specified_attributes."""
process_model = load_test_spec("hello_world") self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
assert process_model.display_name == "hello_world" process_model = load_test_spec(
"test_group/hello_world",
bpmn_file_name="hello_world.bpmn",
process_model_source_directory="hello_world"
)
assert process_model.display_name == "test_group/hello_world"
primary_process_id = process_model.primary_process_id primary_process_id = process_model.primary_process_id
assert primary_process_id == "Process_HelloWorld" assert primary_process_id == "Process_HelloWorld"

View File

@ -1,10 +1,12 @@
"""Test_various_bpmn_constructs.""" """Test_various_bpmn_constructs."""
import pytest import pytest
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.api.api_error import ApiError from flask_bpmn.api.api_error import ApiError
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
@ -14,11 +16,12 @@ class TestOpenFile(BaseTest):
"""TestVariousBpmnConstructs.""" """TestVariousBpmnConstructs."""
def test_dot_notation( def test_dot_notation(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_form_data_conversion_to_dot_dict.""" """Test_form_data_conversion_to_dot_dict."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec( process_model = load_test_spec(
"dangerous", "test_group/dangerous",
bpmn_file_name="read_etc_passwd.bpmn", bpmn_file_name="read_etc_passwd.bpmn",
process_model_source_directory="dangerous-scripts", process_model_source_directory="dangerous-scripts",
) )
@ -38,11 +41,12 @@ class TestImportModule(BaseTest):
"""TestVariousBpmnConstructs.""" """TestVariousBpmnConstructs."""
def test_dot_notation( def test_dot_notation(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_form_data_conversion_to_dot_dict.""" """Test_form_data_conversion_to_dot_dict."""
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec( process_model = load_test_spec(
"dangerous", "test_group/dangerous",
bpmn_file_name="read_env.bpmn", bpmn_file_name="read_env.bpmn",
process_model_source_directory="dangerous-scripts", process_model_source_directory="dangerous-scripts",
) )

View File

@ -1,8 +1,10 @@
"""Test Permissions.""" """Test Permissions."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
@ -16,17 +18,25 @@ class TestScriptUnitTestRunner(BaseTest):
def test_takes_data_and_returns_expected_result( def test_takes_data_and_returns_expected_result(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_takes_data_and_returns_expected_result.""" """Test_takes_data_and_returns_expected_result."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger" process_group_id = "test_logging_spiff_logger"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script" process_model_id = "simple_script"
load_test_spec(process_model_id, process_group_id=process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
process_model_identifier,
bpmn_file_name=process_model_id,
process_model_source_directory=process_model_id
)
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model( ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id process_model_identifier
) )
) )
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier( task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
@ -48,17 +58,26 @@ class TestScriptUnitTestRunner(BaseTest):
def test_fails_when_expected_output_does_not_match_actual_output( def test_fails_when_expected_output_does_not_match_actual_output(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_fails_when_expected_output_does_not_match_actual_output.""" """Test_fails_when_expected_output_does_not_match_actual_output."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger" process_group_id = "test_logging_spiff_logger"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script" process_model_id = "simple_script"
load_test_spec(process_model_id, process_group_id=process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
process_model_identifier,
bpmn_file_name=process_model_id,
process_model_source_directory=process_model_id
)
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model( ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id process_model_identifier
) )
) )
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier( task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
@ -80,17 +99,26 @@ class TestScriptUnitTestRunner(BaseTest):
def test_script_with_unit_tests_when_hey_is_passed_in( def test_script_with_unit_tests_when_hey_is_passed_in(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_script_with_unit_tests_when_hey_is_passed_in.""" """Test_script_with_unit_tests_when_hey_is_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests" process_group_id = "script_with_unit_tests"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests" process_model_id = "script_with_unit_tests"
load_test_spec(process_model_id, process_group_id=process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
process_model_identifier,
bpmn_file_name=process_model_id,
process_model_source_directory=process_model_id
)
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model( ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id process_model_identifier
) )
) )
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier( task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
@ -110,17 +138,27 @@ class TestScriptUnitTestRunner(BaseTest):
def test_script_with_unit_tests_when_hey_is_not_passed_in( def test_script_with_unit_tests_when_hey_is_not_passed_in(
self, self,
app: Flask, app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_script_with_unit_tests_when_hey_is_not_passed_in.""" """Test_script_with_unit_tests_when_hey_is_not_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests" process_group_id = "script_with_unit_tests"
self.create_process_group(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests" process_model_id = "script_with_unit_tests"
load_test_spec(process_model_id, process_group_id=process_group_id) process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
process_model_identifier,
bpmn_file_name=process_model_id,
process_model_source_directory=process_model_id
)
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model( ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id process_model_identifier
) )
) )
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier( task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(

View File

@ -3,29 +3,38 @@ import os
import pytest import pytest
from flask import Flask from flask import Flask
from flask.testing import FlaskClient
from flask_bpmn.api.api_error import ApiError from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.bpmn_process_id_lookup import BpmnProcessIdLookup from spiffworkflow_backend.models.bpmn_process_id_lookup import BpmnProcessIdLookup
from spiffworkflow_backend.models.user import UserModel
class TestSpecFileService(BaseTest): class TestSpecFileService(BaseTest):
"""TestSpecFileService.""" """TestSpecFileService."""
process_group_id = "test_process_group_id"
process_model_id = "call_activity_nested"
bpmn_file_name = "call_activity_nested.bpmn"
call_activity_nested_relative_file_path = os.path.join( call_activity_nested_relative_file_path = os.path.join(
"test_process_group_id", "call_activity_nested", "call_activity_nested.bpmn" process_group_id, process_model_id, bpmn_file_name
) )
def test_can_store_process_ids_for_lookup( def test_can_store_process_ids_for_lookup(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_can_store_process_ids_for_lookup.""" """Test_can_store_process_ids_for_lookup."""
load_test_spec( self.basic_test_setup(
"call_activity_nested", client=client,
process_model_source_directory="call_activity_nested", user=with_super_admin_user,
bpmn_file_name="call_activity_nested", process_group_id=self.process_group_id,
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location="call_activity_nested"
) )
bpmn_process_id_lookups = BpmnProcessIdLookup.query.all() bpmn_process_id_lookups = BpmnProcessIdLookup.query.all()
assert len(bpmn_process_id_lookups) == 1 assert len(bpmn_process_id_lookups) == 1
@ -36,14 +45,17 @@ class TestSpecFileService(BaseTest):
) )
def test_fails_to_save_duplicate_process_id( def test_fails_to_save_duplicate_process_id(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_fails_to_save_duplicate_process_id.""" """Test_fails_to_save_duplicate_process_id."""
bpmn_process_identifier = "Level1" bpmn_process_identifier = "Level1"
load_test_spec( self.basic_test_setup(
"call_activity_nested", client=client,
process_model_source_directory="call_activity_nested", user=with_super_admin_user,
bpmn_file_name="call_activity_nested", process_group_id=self.process_group_id,
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id
) )
bpmn_process_id_lookups = BpmnProcessIdLookup.query.all() bpmn_process_id_lookups = BpmnProcessIdLookup.query.all()
assert len(bpmn_process_id_lookups) == 1 assert len(bpmn_process_id_lookups) == 1
@ -66,25 +78,26 @@ class TestSpecFileService(BaseTest):
) )
def test_updates_relative_file_path_when_appropriate( def test_updates_relative_file_path_when_appropriate(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_updates_relative_file_path_when_appropriate.""" """Test_updates_relative_file_path_when_appropriate."""
bpmn_process_identifier = "Level1" bpmn_process_identifier = "Level1"
bpmn_file_relative_path = os.path.join(
"test_process_group_id", "call_activity_nested", "new_bpmn_file.bpmn"
)
process_id_lookup = BpmnProcessIdLookup( process_id_lookup = BpmnProcessIdLookup(
bpmn_process_identifier=bpmn_process_identifier, bpmn_process_identifier=bpmn_process_identifier,
bpmn_file_relative_path=bpmn_file_relative_path, bpmn_file_relative_path=self.call_activity_nested_relative_file_path,
) )
db.session.add(process_id_lookup) db.session.add(process_id_lookup)
db.session.commit() db.session.commit()
load_test_spec( self.basic_test_setup(
"call_activity_nested", client=client,
process_model_source_directory="call_activity_nested", user=with_super_admin_user,
bpmn_file_name="call_activity_nested", process_group_id=self.process_group_id,
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id
) )
bpmn_process_id_lookups = BpmnProcessIdLookup.query.all() bpmn_process_id_lookups = BpmnProcessIdLookup.query.all()
assert len(bpmn_process_id_lookups) == 1 assert len(bpmn_process_id_lookups) == 1
assert ( assert (

View File

@ -1,23 +1,32 @@
"""Test_various_bpmn_constructs.""" """Test_various_bpmn_constructs."""
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class TestVariousBpmnConstructs(BaseTest): class TestVariousBpmnConstructs(BaseTest):
"""TestVariousBpmnConstructs.""" """TestVariousBpmnConstructs."""
def test_running_process_with_timer_intermediate_catch_event( def test_running_process_with_timer_intermediate_catch_event(
self, app: Flask, with_db_and_bpmn_file_cleanup: None self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel
) -> None: ) -> None:
"""Test_running_process_with_timer_intermediate_catch_event.""" """Test_running_process_with_timer_intermediate_catch_event."""
process_model = load_test_spec( process_model_identifier = self.basic_test_setup(
"timers_intermediate_catch_event", client,
process_model_source_directory="timer_intermediate_catch_event", with_super_admin_user,
"test_group",
"timer_intermediate_catch_event"
)
process_model = ProcessModelService().get_process_model(
process_model_id=process_model_identifier
) )
process_instance = self.create_process_instance_from_process_model( process_instance = self.create_process_instance_from_process_model(