Merge pull request #176 from sartography/feature/insert_or_update_json_data

Feature/insert or update json data
This commit is contained in:
Kevin Burnett 2023-03-13 14:53:49 -07:00 committed by GitHub
commit e847d1ac1b
3 changed files with 87 additions and 60 deletions

View File

@ -1101,7 +1101,7 @@ class ProcessInstanceProcessor:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent, new_task_models, new_json_data_models = ( bpmn_process_parent, new_task_models, new_json_data_dicts = (
TaskService.add_bpmn_process( TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model process_instance_data_dict, self.process_instance_model
) )
@ -1118,9 +1118,10 @@ class ProcessInstanceProcessor:
bpmn_process_guid=subprocess_task_id, bpmn_process_guid=subprocess_task_id,
) )
new_task_models.update(subprocess_new_task_models) new_task_models.update(subprocess_new_task_models)
new_json_data_models.update(subprocess_new_json_data_models) new_json_data_dicts.update(subprocess_new_json_data_models)
db.session.bulk_save_objects(new_task_models.values()) db.session.bulk_save_objects(new_task_models.values())
db.session.bulk_save_objects(new_json_data_models.values())
TaskService.insert_or_update_json_data_records(new_json_data_dicts)
def save(self) -> None: def save(self) -> None:
"""Saves the current state of this processor to the database.""" """Saves the current state of this processor to the database."""
@ -1908,11 +1909,18 @@ class ProcessInstanceProcessor:
db.session.add(details_model) db.session.add(details_model)
# ####### # #######
json_data = TaskService.update_task_model( json_data_dict = TaskService.update_task_model(
task_model, spiff_task, self._serializer task_model, spiff_task, self._serializer
) )
if json_data is not None: if json_data_dict is not None:
db.session.add(json_data) json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=json_data_dict["hash"])
.first()
)
if json_data is None:
json_data = JsonDataModel(**json_data_dict)
db.session.add(json_data)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save() self.save()

View File

@ -2,11 +2,15 @@ import json
from hashlib import sha256 from hashlib import sha256
from typing import Optional from typing import Optional
from typing import Tuple from typing import Tuple
from typing import TypedDict
from flask import current_app
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskStateNames from SpiffWorkflow.task import TaskStateNames
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
@ -15,25 +19,42 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel # noqa: F401
class JsonDataDict(TypedDict):
hash: str
data: dict
class TaskService: class TaskService:
@classmethod @classmethod
def update_task_data_on_task_model( def insert_or_update_json_data_records(
cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]
) -> None:
list_of_dicts = [*json_data_hash_to_json_data_dict_mapping.values()]
if len(list_of_dicts) > 0:
on_duplicate_key_stmt = None
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql":
insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data, status="U"
)
else:
insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=["hash"]
)
db.session.execute(on_duplicate_key_stmt)
@classmethod
def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict cls, task_model: TaskModel, task_data_dict: dict
) -> Optional[JsonDataModel]: ) -> Optional[JsonDataDict]:
task_data_json = json.dumps(task_data_dict, sort_keys=True) task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_to_return = None json_data_dict: Optional[JsonDataDict] = None
if task_model.json_data_hash != task_data_hash: if task_model.json_data_hash != task_data_hash:
json_data = ( json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
json_data_to_return = json_data
task_model.json_data_hash = task_data_hash task_model.json_data_hash = task_data_hash
return json_data_to_return return json_data_dict
@classmethod @classmethod
def update_task_model( def update_task_model(
@ -41,7 +62,7 @@ class TaskService:
task_model: TaskModel, task_model: TaskModel,
spiff_task: SpiffTask, spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> Optional[JsonDataModel]: ) -> Optional[JsonDataDict]:
"""Updates properties_json and data on given task_model. """Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds. This will NOT update start_in_seconds or end_in_seconds.
@ -51,8 +72,10 @@ class TaskService:
spiff_task_data = new_properties_json.pop("data") spiff_task_data = new_properties_json.pop("data")
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]] task_model.state = TaskStateNames[new_properties_json["state"]]
json_data = cls.update_task_data_on_task_model(task_model, spiff_task_data) json_data_dict = cls._update_task_data_on_task_model(
return json_data task_model, spiff_task_data
)
return json_data_dict
@classmethod @classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
@ -64,7 +87,7 @@ class TaskService:
Optional[BpmnProcessModel], Optional[BpmnProcessModel],
TaskModel, TaskModel,
dict[str, TaskModel], dict[str, TaskModel],
dict[str, JsonDataModel], dict[str, JsonDataDict],
]: ]:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by( task_model: Optional[TaskModel] = TaskModel.query.filter_by(
@ -72,9 +95,9 @@ class TaskService:
).first() ).first()
bpmn_process = None bpmn_process = None
new_task_models: dict[str, TaskModel] = {} new_task_models: dict[str, TaskModel] = {}
new_json_data_models: dict[str, JsonDataModel] = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
if task_model is None: if task_model is None:
bpmn_process, new_task_models, new_json_data_models = cls.task_bpmn_process( bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process(
spiff_task, process_instance, serializer spiff_task, process_instance, serializer
) )
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
@ -82,7 +105,7 @@ class TaskService:
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
) )
return (bpmn_process, task_model, new_task_models, new_json_data_models) return (bpmn_process, task_model, new_task_models, new_json_data_dicts)
@classmethod @classmethod
def task_subprocess( def task_subprocess(
@ -107,17 +130,17 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None bpmn_process: Optional[BpmnProcessModel] = None
new_task_models: dict[str, TaskModel] = {} new_task_models: dict[str, TaskModel] = {}
new_json_data_models: dict[str, JsonDataModel] = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
if subprocess is None: if subprocess is None:
bpmn_process = process_instance.bpmn_process bpmn_process = process_instance.bpmn_process
# This is the top level workflow, which has no guid # This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None: if process_instance.bpmn_process_id is None:
bpmn_process, new_task_models, new_json_data_models = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict( serializer.workflow_to_dict(
spiff_task.workflow._get_outermost_workflow() spiff_task.workflow._get_outermost_workflow()
@ -130,7 +153,7 @@ class TaskService:
guid=subprocess_guid guid=subprocess_guid
).first() ).first()
if bpmn_process is None: if bpmn_process is None:
bpmn_process, new_task_models, new_json_data_models = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess), serializer.workflow_to_dict(subprocess),
process_instance, process_instance,
@ -138,7 +161,7 @@ class TaskService:
subprocess_guid, subprocess_guid,
) )
) )
return (bpmn_process, new_task_models, new_json_data_models) return (bpmn_process, new_task_models, new_json_data_dicts)
@classmethod @classmethod
def add_bpmn_process( def add_bpmn_process(
@ -147,7 +170,7 @@ class TaskService:
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
"""This creates and adds a bpmn_process to the Db session. """This creates and adds a bpmn_process to the Db session.
It will also add tasks and relating json_data entries if the bpmn_process is new. It will also add tasks and relating json_data entries if the bpmn_process is new.
@ -157,7 +180,7 @@ class TaskService:
bpmn_process_data_dict = bpmn_process_dict.pop("data") bpmn_process_data_dict = bpmn_process_dict.pop("data")
new_task_models = {} new_task_models = {}
new_json_data_models = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
bpmn_process = None bpmn_process = None
if bpmn_process_parent is not None: if bpmn_process_parent is not None:
@ -179,16 +202,10 @@ class TaskService:
bpmn_process_data_json.encode("utf8") bpmn_process_data_json.encode("utf8")
).hexdigest() ).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash: if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = ( new_json_data_dicts[bpmn_process_data_hash] = {
db.session.query(JsonDataModel.id) "hash": bpmn_process_data_hash,
.filter_by(hash=bpmn_process_data_hash) "data": bpmn_process_data_dict,
.first() }
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data_dict
)
new_json_data_models[bpmn_process_data_hash] = json_data
bpmn_process.json_data_hash = bpmn_process_data_hash bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None: if bpmn_process_parent is None:
@ -220,11 +237,11 @@ class TaskService:
task_model.state = TaskStateNames[state_int] task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties task_model.properties_json = task_properties
json_data = TaskService.update_task_data_on_task_model( json_data_dict = TaskService._update_task_data_on_task_model(
task_model, task_data_dict task_model, task_data_dict
) )
new_task_models[task_model.guid] = task_model new_task_models[task_model.guid] = task_model
if json_data is not None: if json_data_dict is not None:
new_json_data_models[json_data.hash] = json_data new_json_data_dicts[json_data_dict["hash"]] = json_data_dict
return (bpmn_process, new_task_models, new_json_data_models) return (bpmn_process, new_task_models, new_json_data_dicts)

View File

@ -12,7 +12,6 @@ from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel, MessageInstanceCorrelationRuleModel,
@ -20,6 +19,7 @@ from spiffworkflow_backend.models.message_instance_correlation import (
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.task_service import JsonDataDict
from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.task_service import TaskService
@ -60,7 +60,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.task_models: dict[str, TaskModel] = {} self.task_models: dict[str, TaskModel] = {}
self.json_data_models: dict[str, JsonDataModel] = {} self.json_data_dicts: dict[str, JsonDataDict] = {}
self.serializer = serializer self.serializer = serializer
def should_update_task_model(self) -> bool: def should_update_task_model(self) -> bool:
@ -72,14 +72,14 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model(): if self.should_update_task_model():
_bpmn_process, task_model, new_task_models, new_json_data_models = ( _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer spiff_task, self.process_instance, self.serializer
) )
) )
self.current_task_model = task_model self.current_task_model = task_model
self.task_models.update(new_task_models) self.task_models.update(new_task_models)
self.json_data_models.update(new_json_data_models) self.json_data_dicts.update(new_json_data_dicts)
self.current_task_model.start_in_seconds = time.time() self.current_task_model.start_in_seconds = time.time()
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task) self.secondary_engine_step_delegate.will_complete_task(spiff_task)
@ -87,18 +87,20 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def did_complete_task(self, spiff_task: SpiffTask) -> None: def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model(): if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time() self.current_task_model.end_in_seconds = time.time()
json_data = TaskService.update_task_model( json_data_dict = TaskService.update_task_model(
self.current_task_model, spiff_task, self.serializer self.current_task_model, spiff_task, self.serializer
) )
if json_data is not None: if json_data_dict is not None:
self.json_data_models[json_data.hash] = json_data self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
self.task_models[self.current_task_model.guid] = self.current_task_model self.task_models[self.current_task_model.guid] = self.current_task_model
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task) self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, _commit: bool = True) -> None: def save(self, _commit: bool = True) -> None:
db.session.bulk_save_objects(self.task_models.values()) db.session.bulk_save_objects(self.task_models.values())
db.session.bulk_save_objects(self.json_data_models.values())
TaskService.insert_or_update_json_data_records(self.json_data_dicts)
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(commit=False) self.secondary_engine_step_delegate.save(commit=False)
db.session.commit() db.session.commit()
@ -113,19 +115,19 @@ class TaskModelSavingDelegate(EngineStepDelegate):
| TaskState.MAYBE | TaskState.MAYBE
| TaskState.LIKELY | TaskState.LIKELY
): ):
_bpmn_process, task_model, new_task_models, new_json_data_models = ( _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer waiting_spiff_task, self.process_instance, self.serializer
) )
) )
self.task_models.update(new_task_models) self.task_models.update(new_task_models)
self.json_data_models.update(new_json_data_models) self.json_data_dicts.update(new_json_data_dicts)
json_data = TaskService.update_task_model( json_data_dict = TaskService.update_task_model(
task_model, waiting_spiff_task, self.serializer task_model, waiting_spiff_task, self.serializer
) )
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model
if json_data is not None: if json_data_dict is not None:
self.json_data_models[json_data.hash] = json_data self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
class StepDetailLoggingDelegate(EngineStepDelegate): class StepDetailLoggingDelegate(EngineStepDelegate):