add extraction, needs test

This commit is contained in:
burnettk 2022-12-02 17:15:22 -05:00
parent b1b7f322ee
commit 698c39d9e5
3 changed files with 40 additions and 25 deletions

View File

@ -81,6 +81,7 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_metadata import ProcessInstanceMetadataModel
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
@ -576,6 +577,34 @@ class ProcessInstanceProcessor:
db.session.add(details_model)
db.session.commit()
def extract_metadata(self, process_model_info: ProcessModelInfo) -> dict:
if process_model_info.metadata_extraction_paths is None:
return
if len(metadata_extraction_paths) > 0:
return
current_data = self.get_current_data()
for metadata_extraction_path in metadata_extraction_paths:
key = metadata_extraction_path["key"]
path = metadata_extraction_path["path"]
path_segments = path.split(".")
data_for_key = current_data
for path_segment in path_segments:
data_for_key = data_for_key[path_segment]
pim = ProcessInstanceMetadataModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
key=key,
).first()
if pim is None:
pim = ProcessInstanceMetadataModel(
process_instance_id=self.process_instance_model.id,
key=key,
)
pim.value = data_for_key
db.session.add(pim)
db.session.commit()
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self.process_instance_model.bpmn_json = self.serialize()
@ -602,6 +631,15 @@ class ProcessInstanceProcessor:
process_instance_id=self.process_instance_model.id
).all()
ready_or_waiting_tasks = self.get_all_ready_or_waiting_tasks()
process_model_display_name = ""
process_model_info = self.process_model_service.get_process_model(
self.process_instance_model.process_model_identifier
)
if process_model_info is not None:
process_model_display_name = process_model_info.display_name
self.extract_metadata(process_model_info)
for ready_or_waiting_task in ready_or_waiting_tasks:
# filter out non-usertasks
task_spec = ready_or_waiting_task.task_spec
@ -620,13 +658,6 @@ class ProcessInstanceProcessor:
if "formUiSchemaFilename" in properties:
ui_form_file_name = properties["formUiSchemaFilename"]
process_model_display_name = ""
process_model_info = self.process_model_service.get_process_model(
self.process_instance_model.process_model_identifier
)
if process_model_info is not None:
process_model_display_name = process_model_info.display_name
active_task = None
for at in active_tasks:
if at.task_id == str(ready_or_waiting_task.id):
@ -1151,8 +1182,8 @@ class ProcessInstanceProcessor:
def get_current_data(self) -> dict[str, Any]:
"""Get the current data for the process.
Return either most recent task data or the process data
if the process instance is complete
Return either the most recent task data or--if the process instance is complete--
the process data.
"""
if self.process_instance_model.status == "complete":
return self.get_data()

View File

@ -322,18 +322,3 @@ class ProcessInstanceService:
)
return task
@staticmethod
def serialize_flat_with_task_data(
process_instance: ProcessInstanceModel,
) -> dict[str, Any]:
"""NOTE: This is crazy slow. Put the latest task data in the database."""
"""Serialize_flat_with_task_data."""
# results = {}
# try:
# processor = ProcessInstanceProcessor(process_instance)
# process_instance.data = processor.get_current_data()
# results = process_instance.serialized_flat
# except ApiError:
results = process_instance.serialized
return results

View File

@ -1806,7 +1806,6 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 404
data = json.loads(response.get_data(as_text=True))
print(f"data: {data}")
assert data["error_code"] == "unknown_process_instance_report"
def setup_testing_instance(