pyl now passes w/ burnettk

This commit is contained in:
jasquat 2022-10-25 15:25:42 -04:00
parent e9ef5bfeaf
commit 8659f2122e
6 changed files with 45 additions and 41 deletions

View File

@ -7,13 +7,14 @@ import os
import re
import time
from datetime import datetime
from typing import Any, TypedDict
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import NewType
from typing import Optional
from typing import Tuple
from typing import TypedDict
from typing import Union
from flask import current_app
@ -75,7 +76,8 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.task_event import TaskAction
from spiffworkflow_backend.models.task_event import TaskEventModel
from spiffworkflow_backend.models.user import UserModel, UserModelSchema
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserModelSchema
from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -107,6 +109,7 @@ DEFAULT_GLOBALS["__builtins__"]["__import__"] = _import
class PotentialOwnerIdList(TypedDict):
"""PotentialOwnerIdList."""
potential_owner_ids: list[int]
lane_assignment_id: Optional[int]
@ -121,7 +124,8 @@ class NoPotentialOwnersForTaskError(Exception):
class PotentialOwnerUserNotFoundError(Exception):
pass
"""PotentialOwnerUserNotFoundError."""
class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
"""This is a custom script processor that can be easily injected into Spiff Workflow.
@ -508,15 +512,17 @@ class ProcessInstanceProcessor:
self.save()
def raise_if_no_potential_owners(self, potential_owner_ids: list[int], message:str) -> None:
def raise_if_no_potential_owners(
self, potential_owner_ids: list[int], message: str
) -> None:
"""Raise_if_no_potential_owners."""
if not potential_owner_ids:
raise (
NoPotentialOwnersForTaskError(
message
)
)
raise (NoPotentialOwnersForTaskError(message))
def get_potential_owner_ids_from_task(self, task: SpiffTask) -> PotentialOwnerIdList:
def get_potential_owner_ids_from_task(
self, task: SpiffTask
) -> PotentialOwnerIdList:
"""Get_potential_owner_ids_from_task."""
task_spec = task.task_spec
task_lane = "process_initiator"
if task_spec.lane is not None and task_spec.lane != "":
@ -525,21 +531,19 @@ class ProcessInstanceProcessor:
potential_owner_ids = []
lane_assignment_id = None
if re.match(r"(process.?)initiator", task_lane, re.IGNORECASE):
potential_owner_ids = [
self.process_instance_model.process_initiator_id
]
potential_owner_ids = [self.process_instance_model.process_initiator_id]
elif "lane_owners" in task.data and task_lane in task.data["lane_owners"]:
for username in task.data["lane_owners"][task_lane]:
lane_owner_user = UserModel.query.filter_by(username=username).first()
if lane_owner_user is not None:
potential_owner_ids.append(lane_owner_user.id)
self.raise_if_no_potential_owners(potential_owner_ids,
f"No users found in task data lane owner list for lane: {task_lane}. The user list used: {task.data['lane_owners'][task_lane]}"
self.raise_if_no_potential_owners(
potential_owner_ids,
f"No users found in task data lane owner list for lane: {task_lane}. "
f"The user list used: {task.data['lane_owners'][task_lane]}",
)
else:
group_model = GroupModel.query.filter_by(
identifier=task_lane
).first()
group_model = GroupModel.query.filter_by(identifier=task_lane).first()
if group_model is None:
raise (
NoPotentialOwnersForTaskError(
@ -550,12 +554,14 @@ class ProcessInstanceProcessor:
i.user_id for i in group_model.user_group_assignments
]
lane_assignment_id = group_model.id
self.raise_if_no_potential_owners(potential_owner_ids, f"Could not find any users in group to assign to lane: {task_lane}")
self.raise_if_no_potential_owners(
potential_owner_ids,
f"Could not find any users in group to assign to lane: {task_lane}",
)
return {
"potential_owner_ids": potential_owner_ids,
"lane_assignment_id": lane_assignment_id
"lane_assignment_id": lane_assignment_id,
}
def save(self) -> None:
@ -591,7 +597,9 @@ class ProcessInstanceProcessor:
# filter out non-usertasks
task_spec = ready_or_waiting_task.task_spec
if not self.bpmn_process_instance._is_engine_task(task_spec):
potential_owner_hash = self.get_potential_owner_ids_from_task(ready_or_waiting_task)
potential_owner_hash = self.get_potential_owner_ids_from_task(
ready_or_waiting_task
)
extensions = ready_or_waiting_task.task_spec.extensions
form_file_name = None

View File

@ -73,8 +73,10 @@ class ProcessInstanceService:
process_instance.status = ProcessInstanceStatus.erroring.value
db.session.add(process_instance)
db.session.commit()
error_message = f"Error running waiting task for process_instance {process_instance.id}" + \
f"({process_instance.process_model_identifier}). {str(e)}"
error_message = (
f"Error running waiting task for process_instance {process_instance.id}"
+ f"({process_instance.process_model_identifier}). {str(e)}"
)
current_app.logger.error(error_message)
@staticmethod

View File

@ -37,9 +37,7 @@ class SecretService:
) -> SecretModel:
"""Add_secret."""
# encrypted_key = self.encrypt_key(key)
secret_model = SecretModel(
key=key, value=value, user_id=user_id
)
secret_model = SecretModel(key=key, value=value, user_id=user_id)
db.session.add(secret_model)
try:
db.session.commit()
@ -81,9 +79,7 @@ class SecretService:
db.session.rollback()
raise e
elif create_if_not_exists:
SecretService.add_secret(
key=key, value=value, user_id=user_id
)
SecretService.add_secret(key=key, value=value, user_id=user_id)
else:
raise ApiError(
error_code="update_secret_error",

View File

@ -64,12 +64,8 @@ class ServiceTaskDelegate:
return proxied_response.text
secret_key = parsed_response["auth"]
refreshed_token_set = json.dumps(
parsed_response["refreshed_token_set"]
)
SecretService().update_secret(
secret_key, refreshed_token_set, g.user.id
)
refreshed_token_set = json.dumps(parsed_response["refreshed_token_set"])
SecretService().update_secret(secret_key, refreshed_token_set, g.user.id)
return json.dumps(parsed_response["api_response"])

View File

@ -94,6 +94,8 @@ class TestAuthorizationService(BaseTest):
"""Test_user_can_be_added_to_active_task_on_first_login."""
initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.principal is not None
# to ensure there is a user that can be assigned to the task
self.find_or_create_user("testuser1")
AuthorizationService.import_permissions_from_yaml_file()
process_model = load_test_spec(

View File

@ -124,6 +124,7 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_sets_permission_correctly_on_active_task_when_using_dict."""
initiator_user = self.find_or_create_user("initiator_user")
finance_user = self.find_or_create_user("testuser3")
testadmin1 = self.find_or_create_user("testadmin1")
@ -135,7 +136,8 @@ class TestProcessInstanceProcessor(BaseTest):
assert finance_group is not None
process_model = load_test_spec(
process_model_id="model_with_lanes", bpmn_file_name="lanes_with_owner_dict.bpmn"
process_model_id="model_with_lanes",
bpmn_file_name="lanes_with_owner_dict.bpmn",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
@ -199,8 +201,6 @@ class TestProcessInstanceProcessor(BaseTest):
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, initiator_user
)
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, testadmin1
)
ProcessInstanceService.complete_form_task(processor, spiff_task, {}, testadmin1)
assert process_instance.status == ProcessInstanceStatus.complete.value