fixing an untyped method.

This commit is contained in:
Dan 2022-12-05 12:55:44 -05:00
parent 3a09b45765
commit e06500821e

View File

@ -97,6 +97,7 @@ from spiffworkflow_backend.services.service_task_service import ServiceTaskDeleg
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.user_service import UserService
# Sorry about all this crap. I wanted to move this thing to another file, but
# importing a bunch of types causes circular imports.
@ -178,16 +179,16 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
)
return Script.generate_augmented_list(script_attributes_context)
def evaluate(self, task: SpiffTask, expression: str, external_methods=None) -> Any:
def evaluate(self, task: SpiffTask, expression: str, external_methods: Any = None) -> Any:
"""Evaluate."""
return self._evaluate(expression, task.data, task, external_methods)
def _evaluate(
self,
expression: str,
context: Dict[str, Union[Box, str]],
task: Optional[SpiffTask] = None,
external_methods: Optional[Dict[str, Any]] = None,
self,
expression: str,
context: Dict[str, Union[Box, str]],
task: Optional[SpiffTask] = None,
external_methods: Optional[Dict[str, Any]] = None,
) -> Any:
"""_evaluate."""
methods = self.__get_augment_methods(task)
@ -211,7 +212,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
) from exception
def execute(
self, task: SpiffTask, script: str, external_methods: Any = None
self, task: SpiffTask, script: str, external_methods: Any = None
) -> None:
"""Execute."""
try:
@ -225,10 +226,10 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
raise WorkflowTaskExecException(task, f" {script}, {e}", e) from e
def call_service(
self,
operation_name: str,
operation_params: Dict[str, Any],
task_data: Dict[str, Any],
self,
operation_name: str,
operation_params: Dict[str, Any],
task_data: Dict[str, Any],
) -> Any:
"""CallService."""
return ServiceTaskDelegate.call_connector(
@ -275,7 +276,7 @@ class ProcessInstanceProcessor:
# * get_spec, which returns a spec and any subprocesses (as IdToBpmnProcessSpecMapping dict)
# * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow
def __init__(
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
) -> None:
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
tld = current_app.config["THREAD_LOCAL_DATA"]
@ -302,7 +303,7 @@ class ProcessInstanceProcessor:
)
else:
bpmn_json_length = len(process_instance_model.bpmn_json.encode("utf-8"))
megabyte = float(1024**2)
megabyte = float(1024 ** 2)
json_size = bpmn_json_length / megabyte
if json_size > 1:
wf_json = json.loads(process_instance_model.bpmn_json)
@ -316,22 +317,22 @@ class ProcessInstanceProcessor:
len(json.dumps(test_spec).encode("utf-8")) / megabyte
)
message = (
"Workflow "
+ process_instance_model.process_model_identifier
+ f" JSON Size is over 1MB:{json_size:.2f} MB"
"Workflow "
+ process_instance_model.process_model_identifier
+ f" JSON Size is over 1MB:{json_size:.2f} MB"
)
message += f"\n Task Size: {task_size}"
message += f"\n Spec Size: {spec_size}"
current_app.logger.warning(message)
def check_sub_specs(
test_spec: dict, indent: int = 0, show_all: bool = False
test_spec: dict, indent: int = 0, show_all: bool = False
) -> None:
"""Check_sub_specs."""
for my_spec_name in test_spec["task_specs"]:
my_spec = test_spec["task_specs"][my_spec_name]
my_spec_size = (
len(json.dumps(my_spec).encode("utf-8")) / megabyte
len(json.dumps(my_spec).encode("utf-8")) / megabyte
)
if my_spec_size > 0.1 or show_all:
current_app.logger.warning(
@ -367,13 +368,13 @@ class ProcessInstanceProcessor:
raise ApiError(
error_code="unexpected_process_instance_structure",
message="Failed to deserialize process_instance"
" '%s' due to a mis-placed or missing task '%s'"
% (self.process_model_identifier, str(ke)),
" '%s' due to a mis-placed or missing task '%s'"
% (self.process_model_identifier, str(ke)),
) from ke
@classmethod
def get_process_model_and_subprocesses(
cls, process_model_identifier: str
cls, process_model_identifier: str
) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]:
"""Get_process_model_and_subprocesses."""
process_model_info = ProcessModelService.get_process_model(
@ -391,7 +392,7 @@ class ProcessInstanceProcessor:
@classmethod
def get_bpmn_process_instance_from_process_model(
cls, process_model_identifier: str
cls, process_model_identifier: str
) -> BpmnWorkflow:
"""Get_all_bpmn_process_identifiers_for_process_model."""
(bpmn_process_spec, subprocesses) = cls.get_process_model_and_subprocesses(
@ -416,7 +417,7 @@ class ProcessInstanceProcessor:
return current_user
def add_user_info_to_process_instance(
self, bpmn_process_instance: BpmnWorkflow
self, bpmn_process_instance: BpmnWorkflow
) -> None:
"""Add_user_info_to_process_instance."""
current_user = self.current_user()
@ -429,8 +430,8 @@ class ProcessInstanceProcessor:
@staticmethod
def get_bpmn_process_instance_from_workflow_spec(
spec: BpmnProcessSpec,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
spec: BpmnProcessSpec,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
"""Get_bpmn_process_instance_from_workflow_spec."""
return BpmnWorkflow(
@ -441,10 +442,10 @@ class ProcessInstanceProcessor:
@staticmethod
def __get_bpmn_process_instance(
process_instance_model: ProcessInstanceModel,
spec: Optional[BpmnProcessSpec] = None,
validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
process_instance_model: ProcessInstanceModel,
spec: Optional[BpmnProcessSpec] = None,
validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
"""__get_bpmn_process_instance."""
if process_instance_model.bpmn_json:
@ -487,14 +488,14 @@ class ProcessInstanceProcessor:
self.save()
def raise_if_no_potential_owners(
self, potential_owner_ids: list[int], message: str
self, potential_owner_ids: list[int], message: str
) -> None:
"""Raise_if_no_potential_owners."""
if not potential_owner_ids:
raise (NoPotentialOwnersForTaskError(message))
def get_potential_owner_ids_from_task(
self, task: SpiffTask
self, task: SpiffTask
) -> PotentialOwnerIdList:
"""Get_potential_owner_ids_from_task."""
task_spec = task.task_spec
@ -666,7 +667,7 @@ class ProcessInstanceProcessor:
@staticmethod
def backfill_missing_spec_reference_records(
bpmn_process_identifier: str,
bpmn_process_identifier: str,
) -> Optional[str]:
"""Backfill_missing_spec_reference_records."""
process_models = ProcessModelService.get_process_models(recursive=True)
@ -687,7 +688,7 @@ class ProcessInstanceProcessor:
@staticmethod
def bpmn_file_full_path_from_bpmn_process_identifier(
bpmn_process_identifier: str,
bpmn_process_identifier: str,
) -> str:
"""Bpmn_file_full_path_from_bpmn_process_identifier."""
if bpmn_process_identifier is None:
@ -697,8 +698,8 @@ class ProcessInstanceProcessor:
spec_reference = (
SpecReferenceCache.query.filter_by(identifier=bpmn_process_identifier)
.filter_by(type="process")
.first()
.filter_by(type="process")
.first()
)
bpmn_file_full_path = None
if spec_reference is None:
@ -717,15 +718,15 @@ class ProcessInstanceProcessor:
ApiError(
error_code="could_not_find_bpmn_process_identifier",
message="Could not find the the given bpmn process identifier from any sources: %s"
% bpmn_process_identifier,
% bpmn_process_identifier,
)
)
return os.path.abspath(bpmn_file_full_path)
@staticmethod
def update_spiff_parser_with_all_process_dependency_files(
parser: BpmnDmnParser,
processed_identifiers: Optional[set[str]] = None,
parser: BpmnDmnParser,
processed_identifiers: Optional[set[str]] = None,
) -> None:
"""Update_spiff_parser_with_all_process_dependency_files."""
if processed_identifiers is None:
@ -763,7 +764,7 @@ class ProcessInstanceProcessor:
@staticmethod
def get_spec(
files: List[File], process_model_info: ProcessModelInfo
files: List[File], process_model_info: ProcessModelInfo
) -> Tuple[BpmnProcessSpec, IdToBpmnProcessSpecMapping]:
"""Returns a SpiffWorkflow specification for the given process_instance spec, using the files provided."""
parser = ProcessInstanceProcessor.get_parser()
@ -777,14 +778,14 @@ class ProcessInstanceProcessor:
dmn: etree.Element = etree.fromstring(data)
parser.add_dmn_xml(dmn, filename=file.name)
if (
process_model_info.primary_process_id is None
or process_model_info.primary_process_id == ""
process_model_info.primary_process_id is None
or process_model_info.primary_process_id == ""
):
raise (
ApiError(
error_code="no_primary_bpmn_error",
message="There is no primary BPMN process id defined for process_model %s"
% process_model_info.id,
% process_model_info.id,
)
)
ProcessInstanceProcessor.update_spiff_parser_with_all_process_dependency_files(
@ -802,7 +803,7 @@ class ProcessInstanceProcessor:
raise ApiError(
error_code="process_instance_validation_error",
message="Failed to parse the Workflow Specification. "
+ "Error is '%s.'" % str(ve),
+ "Error is '%s.'" % str(ve),
file_name=ve.filename,
task_id=ve.id,
tag=ve.tag,
@ -849,12 +850,12 @@ class ProcessInstanceProcessor:
message_correlations = []
for (
message_correlation_key,
message_correlation_properties,
message_correlation_key,
message_correlation_properties,
) in bpmn_message.correlations.items():
for (
message_correlation_property_identifier,
message_correlation_property_value,
message_correlation_property_identifier,
message_correlation_property_value,
) in message_correlation_properties.items():
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
@ -946,7 +947,7 @@ class ProcessInstanceProcessor:
db.session.add(message_instance)
for (
spiff_correlation_property
spiff_correlation_property
) in waiting_task.task_spec.event_definition.correlation_properties:
# NOTE: we may have to cycle through keys here
# not sure yet if it's valid for a property to be associated with multiple keys
@ -956,9 +957,9 @@ class ProcessInstanceProcessor:
process_instance_id=self.process_instance_model.id,
name=correlation_key_name,
)
.join(MessageCorrelationPropertyModel)
.filter_by(identifier=spiff_correlation_property.name)
.first()
.join(MessageCorrelationPropertyModel)
.filter_by(identifier=spiff_correlation_property.name)
.first()
)
message_correlation_message_instance = (
MessageCorrelationMessageInstanceModel(
@ -1062,12 +1063,12 @@ class ProcessInstanceProcessor:
endtasks = []
if self.bpmn_process_instance.is_completed():
for task in SpiffTask.Iterator(
self.bpmn_process_instance.task_tree, TaskState.ANY_MASK
self.bpmn_process_instance.task_tree, TaskState.ANY_MASK
):
# Assure that we find the end event for this process_instance, and not for any sub-process_instances.
if (
isinstance(task.task_spec, EndEvent)
and task.workflow == self.bpmn_process_instance
isinstance(task.task_spec, EndEvent)
and task.workflow == self.bpmn_process_instance
):
endtasks.append(task)
if len(endtasks) > 0:
@ -1103,8 +1104,8 @@ class ProcessInstanceProcessor:
return task
for task in ready_tasks:
if (
self.bpmn_process_instance.last_task
and task.parent == last_user_task.parent
self.bpmn_process_instance.last_task
and task.parent == last_user_task.parent
):
return task
@ -1114,7 +1115,7 @@ class ProcessInstanceProcessor:
# and return that
next_task = None
for task in SpiffTask.Iterator(
self.bpmn_process_instance.task_tree, TaskState.NOT_FINISHED_MASK
self.bpmn_process_instance.task_tree, TaskState.NOT_FINISHED_MASK
):
next_task = task
return next_task
@ -1198,7 +1199,7 @@ class ProcessInstanceProcessor:
t
for t in all_tasks
if not self.bpmn_process_instance._is_engine_task(t.task_spec)
and t.state in [TaskState.COMPLETED, TaskState.CANCELLED]
and t.state in [TaskState.COMPLETED, TaskState.CANCELLED]
]
def get_all_waiting_tasks(self) -> list[SpiffTask]:
@ -1213,7 +1214,7 @@ class ProcessInstanceProcessor:
@classmethod
def get_task_by_bpmn_identifier(
cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow
cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow
) -> Optional[SpiffTask]:
"""Get_task_by_id."""
all_tasks = bpmn_process_instance.get_tasks(TaskState.ANY_MASK)