Provide info about keys in task data and python env

This commit is contained in:
Jon Herron 2023-02-22 13:13:28 -05:00
parent 2afc19175e
commit 488abc03ea
2 changed files with 24 additions and 12 deletions

View File

@ -32,12 +32,17 @@ class GetDataSizes(Script):
**kwargs: Any **kwargs: Any
) -> Any: ) -> Any:
"""Run.""" """Run."""
task = script_attributes_context.task workflow = script_attributes_context.task.workflow
cumulative_task_data_size = ProcessInstanceProcessor.get_task_data_size( task_data_size = ProcessInstanceProcessor.get_task_data_size(workflow)
task.workflow task_data_keys_by_task = {
) t.task_spec.name: sorted(t.data.keys())
python_env_size = ProcessInstanceProcessor.get_python_env_size(task.workflow) for t in ProcessInstanceProcessor.get_tasks_with_data(workflow)
return { }
"cumulative_task_data_size": cumulative_task_data_size, python_env_size = ProcessInstanceProcessor.get_python_env_size(workflow)
"python_env_size": python_env_size, python_env_keys = workflow.script_engine.environment.user_defined_state().keys()
return {
"python_env_size": python_env_size,
"python_env_keys": sorted(python_env_keys),
"task_data_size": task_data_size,
"task_data_keys_by_task": task_data_keys_by_task,
} }

View File

@ -1607,14 +1607,21 @@ class ProcessInstanceProcessor:
except WorkflowTaskException as we: except WorkflowTaskException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we raise ApiError.from_workflow_exception("task_error", str(we), we) from we
@classmethod
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
return [
task
for task in bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
if len(task.data) > 0
]
@classmethod @classmethod
def get_task_data_size(cls, bpmn_process_instance: BpmnWorkflow) -> int: def get_task_data_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
tasks_to_check = bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK) tasks_with_data = cls.get_tasks_with_data(bpmn_process_instance)
task_data = [task.data for task in tasks_to_check] all_task_data = [task.data for task in tasks_with_data]
task_data_to_check = list(filter(len, task_data))
try: try:
return len(json.dumps(task_data_to_check)) return len(json.dumps(all_task_data))
except Exception: except Exception:
return 0 return 0