Better impl

This commit is contained in:
Jon Herron 2023-01-13 09:05:35 -05:00
parent c4bb1e188d
commit ab2d0cdf6d
1 changed files with 25 additions and 15 deletions

View File

@ -564,25 +564,10 @@ class ProcessInstanceProcessor:
"lane_assignment_id": lane_assignment_id,
}
def check_task_data_size(self, tasks_dict: dict) -> None:
"""CheckTaskDataSize."""
task_data = [v["data"] for v in tasks_dict.values() if len(v["data"]) > 0]
task_data_len = len(json.dumps(task_data))
task_data_limit = 1024
if task_data_len > task_data_limit:
raise (
ApiError(
error_code="task_data_size_exceeded",
message=f"Maximum task data size of {task_data_limit} exceeded."
)
)
def spiff_step_details_mapping(self) -> dict:
"""SaveSpiffStepDetails."""
bpmn_json = self.serialize()
wf_json = json.loads(bpmn_json)
self.check_task_data_size(wf_json["tasks"])
task_json = {"tasks": wf_json["tasks"], "subprocesses": wf_json["subprocesses"]}
return {
@ -1283,8 +1268,33 @@ class ProcessInstanceProcessor:
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
def user_defined_task_data(self, task_data: dict) -> dict:
"""UserDefinedTaskData."""
return {k: v for k, v in task_data.items() if k != "current_user"}
def check_task_data_size(self) -> None:
"""CheckTaskDataSize."""
tasks_to_check = self.bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
task_data = [self.user_defined_task_data(task.data) for task in tasks_to_check]
task_data_to_check = list(filter(len, task_data))
if len(task_data_to_check) == 0:
return
task_data_len = len(json.dumps(task_data_to_check))
task_data_limit = 1024
if task_data_len > task_data_limit:
raise (
ApiError(
error_code="task_data_size_exceeded",
message=f"Maximum task data size of {task_data_limit} exceeded."
)
)
def serialize(self) -> str:
"""Serialize."""
self.check_task_data_size()
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
def next_user_tasks(self) -> list[SpiffTask]: