Merge remote-tracking branch 'origin/main' into feature/allow-dot-notation-in-forms

This commit is contained in:
burnettk 2022-09-29 10:21:47 -04:00
commit e834a54f0d
21 changed files with 767 additions and 159 deletions

View File

@ -56,7 +56,7 @@ def main():
}
columns_to_header_index_mappings = {}
user = UserModel.query.filter_by(username="test_user1").first()
user = UserModel.query.first()
with open("tests/files/tickets.csv") as infile:
reader = csv.reader(infile, delimiter=",")

View File

@ -25,6 +25,7 @@ def main():
app = create_app()
with app.app_context():
no_primary = []
failing_process_models = []
process_models = ProcessModelService().get_process_models()
for process_model in process_models:
if process_model.primary_file_name:
@ -42,11 +43,16 @@ def main():
if process_model.primary_file_name in bad_files:
continue
print(f"primary_file_name: {process_model.primary_file_name}")
SpecFileService.update_file(
process_model,
process_model.primary_file_name,
bpmn_xml_file_contents,
)
try:
SpecFileService.update_file(
process_model,
process_model.primary_file_name,
bpmn_xml_file_contents,
)
except Exception as ex:
failing_process_models.append(
(process_model.primary_file_name, str(ex))
)
# files = SpecFileService.get_files(
# process_model, extension_filter="bpmn"
# )
@ -83,6 +89,10 @@ def main():
no_primary.append(process_model)
# for bpmn in no_primary:
# print(bpmn)
for bpmn_errors in failing_process_models:
print(bpmn_errors)
if len(failing_process_models) > 0:
exit(1)
if __name__ == "__main__":

View File

@ -1,3 +1,3 @@
furo==2022.9.15
sphinx==5.1.1
sphinx==5.2.2
sphinx-click==4.3.0

6
poetry.lock generated
View File

@ -1846,8 +1846,8 @@ pytz = "*"
[package.source]
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "dec9b4b942378d030ae73f1365dfbf108e6f7f8c"
reference = "feature/script-unit-test-extensions"
resolved_reference = "056ecc786fe400c7e6ca4f7ee1905d34a0f261b8"
[[package]]
name = "sqlalchemy"
@ -2157,7 +2157,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "7a3c07a2eef00685adbf44b6e26b740e20fc52bf85e916b6c171b13d4fcc6dc9"
content-hash = "f362f53c3a5a15701751dc2b7c3984db8e85086b7759ffe109ffca74c252a516"
[metadata.files]
alabaster = [

View File

@ -27,8 +27,8 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "feature/dependencies"}
# SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "feature/script-unit-test-extensions"}
# SpiffWorkflow = {develop = true, path = "/home/jason/projects/github/sartography/SpiffWorkflow"}
sentry-sdk = "1.9.0"
sphinx-autoapi = "^1.8.4"

View File

@ -171,7 +171,7 @@ paths:
schema:
type: array
items:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
# process_group_add
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_group_add
@ -182,14 +182,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
responses:
"201":
description: Processs Group
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
/process-groups/{process_group_id}:
parameters:
@ -211,7 +211,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_group_delete
summary: Deletes a single process group
@ -229,14 +229,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
responses:
"200":
description: Process group updated successfully
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
/process-models:
parameters:
@ -272,25 +272,25 @@ paths:
schema:
type: array
items:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
# process_model_add
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_add
summary: Creates a new workflow specification with the given parameters.
summary: Creates a new process model with the given parameters.
tags:
- Process Models
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
responses:
"201":
description: Workflow specification created successfully.
description: Process model created successfully.
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
/process-models/{process_group_id}/{process_model_id}/files:
parameters:
@ -303,7 +303,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification to validate.
description: The unique id of an existing process model to validate.
schema:
type: string
# add_file
@ -369,7 +369,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
# process_model_delete
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_delete
@ -393,14 +393,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
responses:
"200":
description: Process model updated successfully.
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
/process-instances:
parameters:
@ -413,7 +413,7 @@ paths:
- name: process_model_identifier
in: query
required: false
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
- name: page
@ -473,6 +473,61 @@ paths:
type: array
items:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/script-unit-tests:
parameters:
- name: process_group_id
in: path
required: true
description: The unique id of an existing process group
schema:
type: string
- name: process_model_id
in: path
required: true
description: The unique id of an existing process model.
schema:
type: string
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.script_unit_test_create
summary: Create script unit test based on given criteria
tags:
- Script Unit Test
responses:
"200":
description: Script Unit Test Result
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/script-unit-tests/run:
parameters:
- name: process_group_id
in: path
required: true
description: The unique id of an existing process group
schema:
type: string
- name: process_model_id
in: path
required: true
description: The unique id of an existing process model.
schema:
type: string
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.script_unit_test_run
summary: Run a given script unit test.
tags:
- Script Unit Test
responses:
"200":
description: Script Unit Test Result
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/process-models/{process_group_id}/{process_model_id}/process-instances:
parameters:
- name: process_group_id
@ -484,7 +539,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
# process_instance_create
@ -558,7 +613,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
- name: process_instance_id
@ -598,7 +653,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
- name: process_instance_id
@ -631,7 +686,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
- name: page
@ -684,7 +739,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification.
description: The unique id of an existing process model.
schema:
type: string
- name: report_identifier
@ -755,7 +810,7 @@ paths:
- name: process_model_id
in: path
required: true
description: The unique id of an existing workflow specification to validate.
description: The unique id of an existing process model to validate.
schema:
type: string
- name: file_name
@ -928,7 +983,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
responses:
"200":
description: One task
@ -1048,6 +1103,19 @@ paths:
$ref: "#/components/schemas/ProcessInstanceLog"
/secrets:
parameters:
- name: page
in: query
required: false
description: The page number to return. Defaults to page 1.
schema:
type: integer
- name: per_page
in: query
required: false
description: The number of items to show per page. Defaults to page 10.
schema:
type: integer
post:
operationId: spiffworkflow_backend.routes.process_api_blueprint.add_secret
summary: Create a secret for a key and value
@ -1065,6 +1133,18 @@ paths:
application/json:
schema:
type: number
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.secret_list
summary: Return list of all secrets
tags:
- Secrets
responses:
"200":
description: list of secrets
content:
application/json:
schema:
$ref: "#/components/schemas/Secret"
/secrets/{key}:
parameters:
@ -1085,7 +1165,7 @@ paths:
content:
application/json:
schema:
type: string
$ref: "#/components/schemas/Secret"
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.delete_secret
summary: Delete an existing secret
@ -1203,7 +1283,7 @@ components:
properties:
id:
type: string
WorkflowSpecDiffList:
ProcessModelDiffList:
properties:
workflow_spec_id:
type: string
@ -1216,7 +1296,7 @@ components:
new:
type: boolean
example: false
WorkflowSpecFilesList:
ProcessModelFilesList:
properties:
file_model_id:
type: integer
@ -1245,7 +1325,7 @@ components:
type: string
example: f12e2bbd-a20c-673b-ccb8-a8a1ea9c5b7b
WorkflowSpecFilesDiff:
ProcessModelFilesDiff:
properties:
filename:
type: string
@ -1273,7 +1353,7 @@ components:
new:
type: boolean
example: false
WorkflowSpecAll:
ProcessModelAll:
properties:
workflow_spec_id:
type: string
@ -1377,7 +1457,7 @@ components:
x-nullable: true
example: Some Value
WorkflowSpec:
ProcessModel:
properties:
id:
type: string
@ -1398,11 +1478,11 @@ components:
example: false
default: false
workflow_spec_category:
$ref: "#/components/schemas/WorkflowSpecCategory"
$ref: "#/components/schemas/ProcessModelCategory"
is_status:
type: boolean
nullable: true
WorkflowSpecCategory:
ProcessModelCategory:
properties:
id:
type: string
@ -1622,7 +1702,7 @@ components:
study:
$ref: "#/components/schemas/Study"
workflow_sec:
$ref: "#/components/schemas/WorkflowSpec"
$ref: "#/components/schemas/ProcessModel"
spec_version:
type: string
action:

View File

@ -1,13 +1,17 @@
"""Secret_model."""
from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from marshmallow import Schema
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import RelationshipProperty
from spiffworkflow_backend.models.user import UserModel
@dataclass()
class SecretModel(SpiffworkflowBaseDBModel):
"""SecretModel."""
@ -17,19 +21,12 @@ class SecretModel(SpiffworkflowBaseDBModel):
value: str = db.Column(db.String(255), nullable=False)
creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False)
allowed_processes = relationship("SecretAllowedProcessPathModel", cascade="delete")
class SecretModelSchema(Schema):
"""SecretModelSchema."""
class Meta:
"""Meta."""
model = SecretModel
fields = ["key", "value", "creator_user_id"]
allowed_processes: RelationshipProperty = relationship(
"SecretAllowedProcessPathModel", cascade="delete"
)
@dataclass()
class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
"""Allowed processes can be Process Groups or Process Models.
@ -48,6 +45,16 @@ class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
allowed_relative_path: str = db.Column(db.String(500), nullable=False)
class SecretModelSchema(Schema):
"""SecretModelSchema."""
class Meta:
"""Meta."""
model = SecretModel
fields = ["key", "value", "creator_user_id", "allowed_processes"]
class SecretAllowedProcessSchema(Schema):
"""SecretAllowedProcessSchema."""

View File

@ -1,6 +1,7 @@
"""Task."""
import enum
from typing import Any
from typing import Optional
from typing import Union
import marshmallow
@ -113,6 +114,7 @@ class Task:
process_model_display_name: Union[str, None] = None,
form_schema: Union[str, None] = None,
form_ui_schema: Union[str, None] = None,
parent: Optional[str] = None,
):
"""__init__."""
self.id = id
@ -123,6 +125,7 @@ class Task:
self.form = form
self.documentation = documentation
self.lane = lane
self.parent = parent
self.data = data
if self.data is None:
@ -174,6 +177,7 @@ class Task:
"process_model_display_name": self.process_model_display_name,
"form_schema": self.form_schema,
"form_ui_schema": self.form_ui_schema,
"parent": self.parent,
}
@classmethod

View File

@ -1,6 +1,8 @@
"""APIs for dealing with process groups, process models, and process instances."""
import json
import os
import random
import string
import uuid
from typing import Any
from typing import Dict
@ -19,6 +21,8 @@ from flask import request
from flask.wrappers import Response
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from lxml import etree # type: ignore
from lxml.builder import ElementMaker # type: ignore
from SpiffWorkflow import Task as SpiffTask # type: ignore
from SpiffWorkflow import TaskState
from sqlalchemy import desc
@ -44,6 +48,7 @@ from spiffworkflow_backend.models.process_instance_report import (
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessSchema
from spiffworkflow_backend.models.secret_model import SecretModel
from spiffworkflow_backend.models.secret_model import SecretModelSchema
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
from spiffworkflow_backend.models.user import UserModel
@ -58,6 +63,7 @@ from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.script_unit_test_runner import ScriptUnitTestRunner
from spiffworkflow_backend.services.secret_service import SecretService
from spiffworkflow_backend.services.service_task_service import ServiceTaskService
from spiffworkflow_backend.services.spec_file_service import SpecFileService
@ -875,7 +881,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
)
)
if task.type == "UserTask":
if task.type == "User Task":
if not form_schema_file_name:
raise (
ApiError(
@ -902,13 +908,12 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
)
if ui_form_contents:
task.form_ui_schema = ui_form_contents
elif task.type == "ManualTask":
elif task.type == "Manual Task":
if task.properties and task.data:
if task.properties["instructionsForEndUser"]:
task.properties["instructionsForEndUser"] = render_jinja_template(
task.properties["instructionsForEndUser"], task.data
)
return make_response(jsonify(task), 200)
@ -974,6 +979,128 @@ def task_submit(
return Response(json.dumps({"ok": True}), status=202, mimetype="application/json")
def script_unit_test_create(
process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]
) -> flask.wrappers.Response:
"""Script_unit_test_run."""
bpmn_task_identifier = get_required_parameter_or_raise("bpmn_task_identifier", body)
input_json = get_required_parameter_or_raise("input_json", body)
expected_output_json = get_required_parameter_or_raise("expected_output_json", body)
process_model = get_process_model(process_model_id, process_group_id)
file = SpecFileService.get_files(process_model, process_model.primary_file_name)[0]
if file is None:
raise ApiError(
code="cannot_find_file",
message=f"Could not find the primary bpmn file for process_model: {process_model.id}",
status_code=404,
)
# TODO: move this to an xml service or something
file_contents = SpecFileService.get_data(process_model, file.name)
bpmn_etree_element = SpecFileService.get_etree_element_from_binary_data(
file_contents, file.name
)
nsmap = bpmn_etree_element.nsmap
spiff_element_maker = ElementMaker(
namespace="http://spiffworkflow.org/bpmn/schema/1.0/core", nsmap=nsmap
)
script_task_elements = bpmn_etree_element.xpath(
f"//bpmn:scriptTask[@id='{bpmn_task_identifier}']",
namespaces={"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL"},
)
if len(script_task_elements) == 0:
raise ApiError(
code="missing_script_task",
message=f"Cannot find a script task with id: {bpmn_task_identifier}",
status_code=404,
)
script_task_element = script_task_elements[0]
extension_elements = None
extension_elements_array = script_task_element.xpath(
"//bpmn:extensionElements",
namespaces={"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL"},
)
if len(extension_elements_array) == 0:
bpmn_element_maker = ElementMaker(
namespace="http://www.omg.org/spec/BPMN/20100524/MODEL", nsmap=nsmap
)
extension_elements = bpmn_element_maker("extensionElements")
script_task_element.append(extension_elements)
else:
extension_elements = extension_elements_array[0]
unit_test_elements = None
unit_test_elements_array = extension_elements.xpath(
"//spiffworkflow:unitTests",
namespaces={"spiffworkflow": "http://spiffworkflow.org/bpmn/schema/1.0/core"},
)
if len(unit_test_elements_array) == 0:
unit_test_elements = spiff_element_maker("unitTests")
extension_elements.append(unit_test_elements)
else:
unit_test_elements = unit_test_elements_array[0]
fuzz = "".join(
random.choice(string.ascii_uppercase + string.digits) # noqa: S311
for _ in range(7)
)
unit_test_id = f"unit_test_{fuzz}"
input_json_element = spiff_element_maker("inputJson", json.dumps(input_json))
expected_output_json_element = spiff_element_maker(
"expectedOutputJson", json.dumps(expected_output_json)
)
unit_test_element = spiff_element_maker("unitTest", id=unit_test_id)
unit_test_element.append(input_json_element)
unit_test_element.append(expected_output_json_element)
unit_test_elements.append(unit_test_element)
SpecFileService.update_file(
process_model, file.name, etree.tostring(bpmn_etree_element)
)
return Response(json.dumps({"ok": True}), status=202, mimetype="application/json")
def script_unit_test_run(
process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]
) -> flask.wrappers.Response:
"""Script_unit_test_run."""
# FIXME: We should probably clear this somewhere else but this works
current_app.config["THREAD_LOCAL_DATA"].process_instance_id = None
bpmn_task_identifier = get_required_parameter_or_raise("bpmn_task_identifier", body)
python_script = get_required_parameter_or_raise("python_script", body)
input_json = get_required_parameter_or_raise("input_json", body)
expected_output_json = get_required_parameter_or_raise("expected_output_json", body)
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id
)
)
spiff_task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
bpmn_task_identifier, bpmn_process_instance
)
if spiff_task is None:
raise (
ApiError(
code="task_not_found",
message=f"Could not find task with identifier: {bpmn_task_identifier}",
status_code=400,
)
)
result = ScriptUnitTestRunner.run_with_task_and_script_and_pre_post_contexts(
spiff_task, python_script, input_json, expected_output_json
)
return make_response(jsonify(result), 200)
def get_file_from_request() -> Any:
"""Get_file_from_request."""
request_file = connexion.request.files.get("file")
@ -1120,11 +1247,33 @@ def get_secret(key: str) -> Optional[str]:
return SecretService.get_secret(key)
def secret_list(
page: int = 1,
per_page: int = 100,
) -> Response:
"""Secret_list."""
secrets = (
SecretModel.query.order_by(SecretModel.key)
.join(UserModel)
.add_columns(
UserModel.username,
)
.paginate(page, per_page, False)
)
response_json = {
"results": secrets.items,
"pagination": {
"count": len(secrets.items),
"total": secrets.total,
"pages": secrets.pages,
},
}
return make_response(jsonify(response_json), 200)
def add_secret(body: Dict) -> Response:
"""Add secret."""
secret_model = SecretService().add_secret(
body["key"], body["value"], body["creator_user_id"]
)
secret_model = SecretService().add_secret(body["key"], body["value"], g.user.id)
assert secret_model # noqa: S101
return Response(
json.dumps(SecretModelSchema().dump(secret_model)),
@ -1133,18 +1282,20 @@ def add_secret(body: Dict) -> Response:
)
def update_secret(key: str, body: dict) -> None:
def update_secret(key: str, body: dict) -> Response:
"""Update secret."""
SecretService().update_secret(key, body["value"], body["creator_user_id"])
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def delete_secret(key: str) -> None:
def delete_secret(key: str) -> Response:
"""Delete secret."""
current_user = UserService.current_user()
SecretService.delete_secret(key, current_user.id)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def add_allowed_process_path(body: dict) -> Any:
def add_allowed_process_path(body: dict) -> Response:
"""Get allowed process paths."""
allowed_process_path = SecretService.add_allowed_process(
body["secret_id"], g.user.id, body["allowed_relative_path"]
@ -1156,6 +1307,25 @@ def add_allowed_process_path(body: dict) -> Any:
)
def delete_allowed_process_path(allowed_process_path_id: int) -> Any:
def delete_allowed_process_path(allowed_process_path_id: int) -> Response:
"""Get allowed process paths."""
SecretService().delete_allowed_process(allowed_process_path_id, g.user.id)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any:
"""Get_required_parameter_or_raise."""
return_value = None
if parameter in post_body:
return_value = post_body[parameter]
if return_value is None or return_value == "":
raise (
ApiError(
code="missing_required_parameter",
message=f"Parameter is missing from json request body: {parameter}",
status_code=400,
)
)
return return_value

View File

@ -178,7 +178,9 @@ class DBHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
"""Emit."""
if record:
# if we do not have a process instance id then do not log and assume we are running a script unit test
# that initializes a BpmnWorkflow without a process instance
if record and record.process_instance_id: # type: ignore
bpmn_process_identifier = record.workflow # type: ignore
spiff_task_guid = str(record.task_id) # type: ignore
bpmn_task_identifier = str(record.task_spec) # type: ignore

View File

@ -30,7 +30,6 @@ from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
from SpiffWorkflow.dmn.serializer import BusinessRuleTaskConverter # type: ignore
from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
from SpiffWorkflow.specs import WorkflowSpec # type: ignore
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
from SpiffWorkflow.spiff.serializer import BoundaryEventConverter # type: ignore
from SpiffWorkflow.spiff.serializer import CallActivityTaskConverter
@ -40,6 +39,7 @@ from SpiffWorkflow.spiff.serializer import IntermediateThrowEventConverter
from SpiffWorkflow.spiff.serializer import ManualTaskConverter
from SpiffWorkflow.spiff.serializer import NoneTaskConverter
from SpiffWorkflow.spiff.serializer import ReceiveTaskConverter
from SpiffWorkflow.spiff.serializer import ScriptTaskConverter
from SpiffWorkflow.spiff.serializer import SendTaskConverter
from SpiffWorkflow.spiff.serializer import ServiceTaskConverter
from SpiffWorkflow.spiff.serializer import StartEventConverter
@ -159,6 +159,7 @@ class ProcessInstanceProcessor:
ManualTaskConverter,
NoneTaskConverter,
ReceiveTaskConverter,
ScriptTaskConverter,
SendTaskConverter,
ServiceTaskConverter,
StartEventConverter,
@ -185,23 +186,16 @@ class ProcessInstanceProcessor:
self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService()
spec = None
bpmn_process_spec = None
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.bpmn_json is None:
spec_info = self.process_model_service.get_process_model(
process_instance_model.process_model_identifier
(
bpmn_process_spec,
subprocesses,
) = ProcessInstanceProcessor.get_process_model_and_subprocesses(
process_instance_model.process_model_identifier,
process_instance_model.process_group_identifier,
)
if spec_info is None:
raise (
ApiError(
"missing_spec",
"The spec this process_instance references does not currently exist.",
)
)
self.spec_files = SpecFileService.get_files(
spec_info, include_libraries=True
)
(spec, subprocesses) = self.get_spec(self.spec_files, spec_info)
else:
bpmn_json_length = len(process_instance_model.bpmn_json.encode("utf-8"))
megabyte = float(1024**2)
@ -255,7 +249,10 @@ class ProcessInstanceProcessor:
try:
self.bpmn_process_instance = self.__get_bpmn_process_instance(
process_instance_model, spec, validate_only, subprocesses=subprocesses
process_instance_model,
bpmn_process_spec,
validate_only,
subprocesses=subprocesses,
)
self.bpmn_process_instance.script_engine = self._script_engine
@ -281,6 +278,39 @@ class ProcessInstanceProcessor:
% (self.process_model_identifier, str(ke)),
) from ke
@classmethod
def get_process_model_and_subprocesses(
cls, process_model_identifier: str, process_group_identifier: str
) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]:
"""Get_process_model_and_subprocesses."""
process_model_info = ProcessModelService().get_process_model(
process_model_identifier, process_group_identifier
)
if process_model_info is None:
raise (
ApiError(
"process_model_not_found",
f"The given process model was not found: {process_group_identifier}/{process_model_identifier}.",
)
)
spec_files = SpecFileService.get_files(
process_model_info, include_libraries=True
)
return cls.get_spec(spec_files, process_model_info)
@classmethod
def get_bpmn_process_instance_from_process_model(
cls, process_model_identifier: str, process_group_identifier: str
) -> BpmnWorkflow:
"""Get_all_bpmn_process_identifiers_for_process_model."""
(bpmn_process_spec, subprocesses) = cls.get_process_model_and_subprocesses(
process_model_identifier,
process_group_identifier,
)
return cls.get_bpmn_process_instance_from_workflow_spec(
bpmn_process_spec, subprocesses
)
def add_user_info_to_process_instance(
self, bpmn_process_instance: BpmnWorkflow
) -> None:
@ -354,10 +384,22 @@ class ProcessInstanceProcessor:
# UserFileService().delete_file(file.id)
db.session.commit()
@staticmethod
def get_bpmn_process_instance_from_workflow_spec(
spec: BpmnProcessSpec,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
"""Get_bpmn_process_instance_from_workflow_spec."""
return BpmnWorkflow(
spec,
script_engine=ProcessInstanceProcessor._script_engine,
subprocess_specs=subprocesses,
)
@staticmethod
def __get_bpmn_process_instance(
process_instance_model: ProcessInstanceModel,
spec: Optional[WorkflowSpec] = None,
spec: Optional[BpmnProcessSpec] = None,
validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
@ -383,10 +425,10 @@ class ProcessInstanceProcessor:
ProcessInstanceProcessor._script_engine
)
else:
bpmn_process_instance = BpmnWorkflow(
spec,
script_engine=ProcessInstanceProcessor._script_engine,
subprocess_specs=subprocesses,
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec(
spec, subprocesses
)
)
bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
@ -611,9 +653,9 @@ class ProcessInstanceProcessor:
)
try:
spec = parser.get_spec(process_model_info.primary_process_id)
bpmn_process_spec = parser.get_spec(process_model_info.primary_process_id)
# returns a dict of {process_id: spec}, otherwise known as an IdToBpmnProcessSpecMapping
# returns a dict of {process_id: bpmn_process_spec}, otherwise known as an IdToBpmnProcessSpecMapping
subprocesses = parser.get_subprocess_specs(
process_model_info.primary_process_id
)
@ -626,7 +668,7 @@ class ProcessInstanceProcessor:
task_id=ve.id,
tag=ve.tag,
) from ve
return (spec, subprocesses)
return (bpmn_process_spec, subprocesses)
@staticmethod
def status_of(bpmn_process_instance: BpmnWorkflow) -> ProcessInstanceStatus:
@ -979,9 +1021,12 @@ class ProcessInstanceProcessor:
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [t for t in all_tasks if t.state in [TaskState.WAITING, TaskState.READY]]
def get_task_by_bpmn_identifier(self, bpmn_task_identifier: str) -> SpiffTask:
@classmethod
def get_task_by_bpmn_identifier(
cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow
) -> Optional[SpiffTask]:
"""Get_task_by_id."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
all_tasks = bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
for task in all_tasks:
if task.task_spec.name == bpmn_task_identifier:
return task

View File

@ -8,14 +8,6 @@ from typing import Optional
from flask import current_app
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from SpiffWorkflow.bpmn.specs.events import EndEvent # type: ignore
from SpiffWorkflow.bpmn.specs.events import StartEvent
from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask # type: ignore
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask # type: ignore
from SpiffWorkflow.specs import CancelTask # type: ignore
from SpiffWorkflow.specs import StartTask
from SpiffWorkflow.spiff.specs.manual_task import ManualTask # type: ignore
from SpiffWorkflow.spiff.specs.user_task import UserTask # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
@ -271,30 +263,6 @@ class ProcessInstanceService:
return lane_uids
# @staticmethod
# def get_task_type(spiff_task: SpiffTask):
# """Get_task_type."""
# task_type = spiff_task.task_spec.__class__.__name__
#
# task_types = [
# UserTask,
# ManualTask,
# BusinessRuleTask,
# CancelTask,
# ScriptTask,
# StartTask,
# EndEvent,
# StartEvent,
# ]
#
# for t in task_types:
# if isinstance(spiff_task.task_spec, t):
# task_type = t.__name__
# break
# else:
# task_type = "NoneTask"
# return task_type
@staticmethod
def complete_form_task(
processor: ProcessInstanceProcessor,
@ -425,25 +393,7 @@ class ProcessInstanceService:
spiff_task: SpiffTask, add_docs_and_forms: bool = False
) -> Task:
"""Spiff_task_to_api_task."""
task_type = spiff_task.task_spec.__class__.__name__
task_types = [
UserTask,
ManualTask,
BusinessRuleTask,
CancelTask,
ScriptTask,
StartTask,
EndEvent,
StartEvent,
]
for t in task_types:
if isinstance(spiff_task.task_spec, t):
task_type = t.__name__
break
else:
task_type = "NoneTask"
task_type = spiff_task.task_spec.spec_type
info = spiff_task.task_info()
if info["is_looping"]:
@ -465,6 +415,10 @@ class ProcessInstanceService:
else:
lane = None
parent_id = None
if spiff_task.parent:
parent_id = spiff_task.parent.id
task = Task(
spiff_task.id,
spiff_task.task_spec.name,
@ -477,6 +431,7 @@ class ProcessInstanceService:
multi_instance_index=info["mi_index"],
process_name=spiff_task.task_spec._wf_spec.description,
properties=props,
parent=parent_id,
)
return task

View File

@ -0,0 +1,99 @@
"""Process_instance_processor."""
import json
from dataclasses import dataclass
from typing import Any
from typing import Optional
from SpiffWorkflow import Task as SpiffTask # type: ignore
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskExecException # type: ignore
from spiffworkflow_backend.services.process_instance_processor import (
CustomBpmnScriptEngine,
)
PythonScriptContext = dict[str, Any]
@dataclass
class ScriptUnitTestResult:
"""ScriptUnitTestResult."""
result: bool
context: Optional[PythonScriptContext] = None
error: Optional[str] = None
line_number: Optional[int] = None
offset: Optional[int] = None
class ScriptUnitTestRunner:
"""ScriptUnitTestRunner."""
_script_engine = CustomBpmnScriptEngine()
@classmethod
def run_with_task_and_script_and_pre_post_contexts(
cls,
task: SpiffTask,
script: str,
input_context: PythonScriptContext,
expected_output_context: PythonScriptContext,
) -> ScriptUnitTestResult:
"""Run_task."""
task.data = input_context
try:
cls._script_engine.execute(task, script)
except WorkflowTaskExecException as ex:
return ScriptUnitTestResult(
result=False,
error=f"Failed to execute script: {str(ex)}",
line_number=ex.line_number,
offset=ex.offset,
)
except Exception as ex:
return ScriptUnitTestResult(
result=False,
error=f"Failed to execute script: {str(ex)}",
)
result_as_boolean = task.data == expected_output_context
script_unit_test_result = ScriptUnitTestResult(
result=result_as_boolean, context=task.data
)
return script_unit_test_result
@classmethod
def run_test(
cls,
task: SpiffTask,
test_identifier: str,
) -> ScriptUnitTestResult:
"""Run_test."""
# this is totally made up, but hopefully resembles what spiffworkflow ultimately does
unit_tests = task.task_spec.extensions["unitTests"]
unit_test = [
unit_test for unit_test in unit_tests if unit_test["id"] == test_identifier
][0]
input_context = None
expected_output_context = None
try:
input_context = json.loads(unit_test["inputJson"])
except json.decoder.JSONDecodeError as ex:
return ScriptUnitTestResult(
result=False,
error=f"Failed to parse inputJson: {unit_test['inputJson']}: {str(ex)}",
)
try:
expected_output_context = json.loads(unit_test["expectedOutputJson"])
except json.decoder.JSONDecodeError as ex:
return ScriptUnitTestResult(
result=False,
error=f"Failed to parse expectedOutputJson: {unit_test['expectedOutputJson']}: {str(ex)}",
)
script = task.task_spec.script
return cls.run_with_task_and_script_and_pre_post_contexts(
task, script, input_context, expected_output_context
)

View File

@ -54,13 +54,13 @@ class SecretService:
return secret_model
@staticmethod
def get_secret(key: str) -> Optional[str]:
def get_secret(key: str) -> Optional[SecretModel]:
"""Get_secret."""
secret: SecretModel = (
db.session.query(SecretModel).filter(SecretModel.key == key).first()
)
if secret is not None:
return secret.value
return secret
@staticmethod
def update_secret(

View File

@ -23,7 +23,9 @@ class ServiceTaskDelegate:
secret_prefix = "secret:" # noqa: S105
if value.startswith(secret_prefix):
key = value.removeprefix(secret_prefix)
value = SecretService().get_secret(key)
secret = SecretService().get_secret(key)
assert secret # noqa: S101
value = secret.value
return value
@staticmethod

View File

@ -468,7 +468,7 @@ class SpecFileService(FileSystemService):
)
if not correlation_property_retrieval_expressions:
raise ValidationException(
"Correlation is missing correlation property retrieval expressions: {correlation_identifier}"
f"Correlation is missing correlation property retrieval expressions: {correlation_identifier}"
)
for cpre in correlation_property_retrieval_expressions:

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1ny7jp4" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.0.0" modeler:executionPlatform="Camunda Platform" modeler:executionPlatformVersion="7.17.0">
<bpmn:process id="sample" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_10jwwqy</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_10jwwqy" sourceRef="StartEvent_1" targetRef="Activity_03fldr6" />
<bpmn:endEvent id="Event_1qb1u6a">
<bpmn:incoming>Flow_0htxke7</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0htxke7" sourceRef="script_with_unit_test_id" targetRef="Event_1qb1u6a" />
<bpmn:scriptTask id="script_with_unit_test_id" name="Script with unit test">
<bpmn:extensionElements>
<spiffworkflow:unitTests>
<spiffworkflow:unitTest id="sets_hey_to_true_if_hey_is_false">
<spiffworkflow:inputJson>{"hey": false}</spiffworkflow:inputJson>
<spiffworkflow:expectedOutputJson>{"hey": true}</spiffworkflow:expectedOutputJson>
</spiffworkflow:unitTest>
<spiffworkflow:unitTest id="sets_something_else_if_no_hey">
<spiffworkflow:inputJson>{}</spiffworkflow:inputJson>
<spiffworkflow:expectedOutputJson>{"something_else": true}</spiffworkflow:expectedOutputJson>
</spiffworkflow:unitTest>
</spiffworkflow:unitTests>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0niwe1y</bpmn:incoming>
<bpmn:outgoing>Flow_0htxke7</bpmn:outgoing>
<bpmn:script>if 'hey' in locals():
hey = True
else:
something_else = True</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_0niwe1y" sourceRef="Activity_03fldr6" targetRef="script_with_unit_test_id" />
<bpmn:scriptTask id="Activity_03fldr6" name="Set var">
<bpmn:incoming>Flow_10jwwqy</bpmn:incoming>
<bpmn:outgoing>Flow_0niwe1y</bpmn:outgoing>
<bpmn:script>hey = False</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="sample">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="132" y="102" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1qb1u6a_di" bpmnElement="Event_1qb1u6a">
<dc:Bounds x="642" y="102" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_17ohe7r_di" bpmnElement="script_with_unit_test_id">
<dc:Bounds x="440" y="80" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1fab1y2_di" bpmnElement="Activity_03fldr6">
<dc:Bounds x="250" y="80" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_10jwwqy_di" bpmnElement="Flow_10jwwqy">
<di:waypoint x="168" y="120" />
<di:waypoint x="250" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0htxke7_di" bpmnElement="Flow_0htxke7">
<di:waypoint x="540" y="120" />
<di:waypoint x="642" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0niwe1y_di" bpmnElement="Flow_0niwe1y">
<di:waypoint x="350" y="120" />
<di:waypoint x="440" y="120" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -1334,7 +1334,7 @@ class TestProcessApi(BaseTest):
api_error = json.loads(response.get_data(as_text=True))
assert api_error["code"] == "task_error"
assert (
'Activity_CauseError: TypeError:can only concatenate str (not "int") to str'
'Activity_CauseError: TypeError: can only concatenate str (not "int") to str'
in api_error["message"]
)
@ -1416,7 +1416,7 @@ class TestProcessApi(BaseTest):
assert message.subject == "Unexpected error in app"
assert (
message.body
== 'Activity_CauseError: TypeError:can only concatenate str (not "int") to str'
== 'Activity_CauseError: TypeError: can only concatenate str (not "int") to str'
)
assert message.recipients == process_model.exception_notification_addresses

View File

@ -1,5 +1,6 @@
"""Test_secret_service."""
import json
from typing import Optional
import pytest
from flask.app import Flask
@ -103,7 +104,7 @@ class TestSecretService(SecretServiceTestHelpers):
secret = SecretService().get_secret(self.test_key)
assert secret is not None
assert secret == self.test_value
assert secret.value == self.test_value
def test_get_secret_bad_key_fails(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
@ -122,10 +123,12 @@ class TestSecretService(SecretServiceTestHelpers):
user = self.find_or_create_user()
self.add_test_secret(user)
secret = SecretService.get_secret(self.test_key)
assert secret == self.test_value
assert secret
assert secret.value == self.test_value
SecretService.update_secret(self.test_key, "new_secret_value", user.id)
new_secret = SecretService.get_secret(self.test_key)
assert new_secret == "new_secret_value" # noqa: S105
assert new_secret
assert new_secret.value == "new_secret_value" # noqa: S105
def test_update_secret_bad_user_fails(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
@ -370,7 +373,8 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
)
assert secret_response
assert secret_response.status_code == 200
assert secret_response.json == self.test_value
assert secret_response.json
assert secret_response.json["value"] == self.test_value
def test_update_secret(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
@ -378,8 +382,9 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
"""Test_update_secret."""
user = self.find_or_create_user()
self.add_test_secret(user)
secret = SecretService.get_secret(self.test_key)
assert secret == self.test_value
secret: Optional[SecretModel] = SecretService.get_secret(self.test_key)
assert secret
assert secret.value == self.test_value
secret_model = SecretModel(
key=self.test_key, value="new_secret_value", creator_user_id=user.id
)
@ -389,7 +394,7 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
content_type="application/json",
data=json.dumps(SecretModelSchema().dump(secret_model)),
)
assert response.status_code == 204
assert response.status_code == 200
secret_model = SecretModel.query.filter(
SecretModel.key == self.test_key
@ -404,12 +409,12 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
self.add_test_secret(user)
secret = SecretService.get_secret(self.test_key)
assert secret
assert secret == self.test_value
assert secret.value == self.test_value
secret_response = client.delete(
f"/v1.0/secrets/{self.test_key}",
headers=self.logged_in_headers(user),
)
assert secret_response.status_code == 204
assert secret_response.status_code == 200
secret = SecretService.get_secret(self.test_key)
assert secret is None
@ -484,6 +489,6 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
f"/v1.0/secrets/allowed_process_paths/{allowed_process.id}",
headers=self.logged_in_headers(user),
)
assert response.status_code == 204
assert response.status_code == 200
allowed_processes = SecretAllowedProcessPathModel.query.all()
assert len(allowed_processes) == 0

View File

@ -0,0 +1,18 @@
"""Test_process_instance_processor."""
from flask.app import Flask
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
# it's not totally obvious we want to keep this test/file
def test_script_engine_takes_data_and_returns_expected_results(
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_script_engine_takes_data_and_returns_expected_results."""
script_engine = ProcessInstanceProcessor._script_engine
result = script_engine._evaluate("a", {"a": 1})
assert result == 1

View File

@ -0,0 +1,142 @@
"""Test Permissions."""
from flask.app import Flask
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.script_unit_test_runner import PythonScriptContext
from spiffworkflow_backend.services.script_unit_test_runner import ScriptUnitTestRunner
class TestScriptUnitTestRunner(BaseTest):
"""TestScriptUnitTestRunner."""
def test_takes_data_and_returns_expected_result(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_takes_data_and_returns_expected_result."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
process_model_id = "simple_script"
load_test_spec(process_model_id, process_group_id=process_group_id)
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id
)
)
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
"Activity_RunScript", bpmn_process_instance
)
assert task is not None
input_context: PythonScriptContext = {"a": 1}
expected_output_context: PythonScriptContext = {"a": 2}
script = "a = 2"
unit_test_result = (
ScriptUnitTestRunner.run_with_task_and_script_and_pre_post_contexts(
task, script, input_context, expected_output_context
)
)
assert unit_test_result.result
assert unit_test_result.context == {"a": 2}
def test_fails_when_expected_output_does_not_match_actual_output(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_fails_when_expected_output_does_not_match_actual_output."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
process_model_id = "simple_script"
load_test_spec(process_model_id, process_group_id=process_group_id)
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id
)
)
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
"Activity_RunScript", bpmn_process_instance
)
assert task is not None
input_context: PythonScriptContext = {"a": 1}
expected_output_context: PythonScriptContext = {"a": 2, "b": 3}
script = "a = 2"
unit_test_result = (
ScriptUnitTestRunner.run_with_task_and_script_and_pre_post_contexts(
task, script, input_context, expected_output_context
)
)
assert unit_test_result.result is not True
assert unit_test_result.context == {"a": 2}
def test_script_with_unit_tests_when_hey_is_passed_in(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
process_model_id = "script_with_unit_tests"
load_test_spec(process_model_id, process_group_id=process_group_id)
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id
)
)
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
"script_with_unit_test_id", bpmn_process_instance
)
assert task is not None
expected_output_context: PythonScriptContext = {"hey": True}
unit_test_result = ScriptUnitTestRunner.run_test(
task, "sets_hey_to_true_if_hey_is_false"
)
assert unit_test_result.result
assert unit_test_result.context == expected_output_context
def test_script_with_unit_tests_when_hey_is_not_passed_in(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_not_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
process_model_id = "script_with_unit_tests"
load_test_spec(process_model_id, process_group_id=process_group_id)
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_process_model(
process_model_id, process_group_id
)
)
task = ProcessInstanceProcessor.get_task_by_bpmn_identifier(
"script_with_unit_test_id", bpmn_process_instance
)
assert task is not None
expected_output_context: PythonScriptContext = {"something_else": True}
unit_test_result = ScriptUnitTestRunner.run_test(
task, "sets_something_else_if_no_hey"
)
assert unit_test_result.result
assert unit_test_result.context == expected_output_context