Add cumulative task data size to script

This commit is contained in:
Jon Herron 2023-02-22 10:39:35 -05:00
parent bb6dd35bbd
commit 2fa4623876
2 changed files with 52 additions and 7 deletions

View File

@ -0,0 +1,37 @@
"""Get_data_sizes."""
from typing import Any
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.scripts.script import Script
#from spiffworkflow_backend.servces.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class GetDataSizes(Script):
"""GetDataSizes."""
@staticmethod
def requires_privileged_permissions() -> bool:
"""We have deemed this function safe to run without elevated permissions."""
return False
def get_description(self) -> str:
"""Get_description."""
return """Returns a dictionary of information about the size of task data and the python environment for the currently running process."""
def run(
self,
script_attributes_context: ScriptAttributesContext,
*_args: Any,
**kwargs: Any
) -> Any:
"""Run."""
task = script_attributes_context.task
return {
"cumulative_task_data_size": ProcessInstanceProcessor.get_task_data_size(task.workflow),
"python_env_size": 0,
}

View File

@ -1602,16 +1602,24 @@ class ProcessInstanceProcessor:
except WorkflowTaskException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
def check_task_data_size(self) -> None:
"""CheckTaskDataSize."""
tasks_to_check = self.bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
task_data = [task.data for task in tasks_to_check]
task_data_to_check = list(filter(len, task_data))
@classmethod
def _get_data_size(cls, data: Dict[Any, Any]) -> int:
data_to_check = list(filter(len, data))
try:
task_data_len = len(json.dumps(task_data_to_check))
return len(json.dumps(data_to_check))
except Exception:
task_data_len = 0
return 0
@classmethod
def get_task_data_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
tasks_to_check = bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
task_data = [task.data for task in tasks_to_check]
return cls._get_data_size(task_data)
def check_task_data_size(self) -> None:
"""CheckTaskDataSize."""
task_data_len = self.get_task_data_size(self.bpmn_process_instance)
# Not sure what the number here should be but this now matches the mysql
# max_allowed_packet variable on dev - 1073741824