mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-01-13 11:05:56 +00:00
Merge pull request #175 from sartography/feature/bulk_insert_tasks
Feature/bulk insert tasks
This commit is contained in:
commit
67c7ba9bd8
1
.flake8
1
.flake8
@ -2,6 +2,7 @@
|
||||
select = B,B9,C,D,DAR,E,F,N,RST,S,W
|
||||
ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320
|
||||
max-line-length = 120
|
||||
extend-ignore = E203
|
||||
max-complexity = 30
|
||||
docstring-convention = google
|
||||
rst-roles = class,const,func,meth,mod,ref
|
||||
|
@ -2,6 +2,7 @@
|
||||
select = B,B9,C,D,DAR,E,F,N,RST,S,W
|
||||
ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320
|
||||
max-line-length = 120
|
||||
extend-ignore = E203
|
||||
max-complexity = 30
|
||||
docstring-convention = google
|
||||
rst-roles = class,const,func,meth,mod,ref
|
||||
|
@ -613,14 +613,37 @@ class ProcessInstanceProcessor:
|
||||
).first()
|
||||
bpmn_process_dict = {"data": json_data.data, "tasks": {}}
|
||||
bpmn_process_dict.update(bpmn_process.properties_json)
|
||||
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
|
||||
for task in tasks:
|
||||
json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first()
|
||||
bpmn_process_dict["tasks"][task.guid] = task.properties_json
|
||||
bpmn_process_dict["tasks"][task.guid]["data"] = json_data.data
|
||||
|
||||
if get_tasks:
|
||||
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
|
||||
cls._get_tasks_dict(tasks, bpmn_process_dict)
|
||||
return bpmn_process_dict
|
||||
|
||||
@classmethod
|
||||
def _get_tasks_dict(
|
||||
cls,
|
||||
tasks: list[TaskModel],
|
||||
spiff_bpmn_process_dict: dict,
|
||||
bpmn_subprocess_id_to_guid_mappings: Optional[dict] = None,
|
||||
) -> None:
|
||||
json_data_hashes = set()
|
||||
for task in tasks:
|
||||
json_data_hashes.add(task.json_data_hash)
|
||||
json_data_records = JsonDataModel.query.filter(JsonDataModel.hash.in_(json_data_hashes)).all() # type: ignore
|
||||
json_data_mappings = {}
|
||||
for json_data_record in json_data_records:
|
||||
json_data_mappings[json_data_record.hash] = json_data_record.data
|
||||
for task in tasks:
|
||||
tasks_dict = spiff_bpmn_process_dict["tasks"]
|
||||
if bpmn_subprocess_id_to_guid_mappings:
|
||||
bpmn_subprocess_guid = bpmn_subprocess_id_to_guid_mappings[
|
||||
task.bpmn_process_id
|
||||
]
|
||||
tasks_dict = spiff_bpmn_process_dict["subprocesses"][
|
||||
bpmn_subprocess_guid
|
||||
]["tasks"]
|
||||
tasks_dict[task.guid] = task.properties_json
|
||||
tasks_dict[task.guid]["data"] = json_data_mappings[task.json_data_hash]
|
||||
|
||||
@classmethod
|
||||
def _get_full_bpmn_process_dict(
|
||||
cls, process_instance_model: ProcessInstanceModel
|
||||
@ -655,9 +678,11 @@ class ProcessInstanceProcessor:
|
||||
bpmn_subprocesses = BpmnProcessModel.query.filter_by(
|
||||
parent_process_id=bpmn_process.id
|
||||
).all()
|
||||
bpmn_subprocess_ids = {}
|
||||
bpmn_subprocess_id_to_guid_mappings = {}
|
||||
for bpmn_subprocess in bpmn_subprocesses:
|
||||
bpmn_subprocess_ids[bpmn_subprocess.id] = bpmn_subprocess.guid
|
||||
bpmn_subprocess_id_to_guid_mappings[bpmn_subprocess.id] = (
|
||||
bpmn_subprocess.guid
|
||||
)
|
||||
single_bpmn_process_dict = cls._get_bpmn_process_dict(
|
||||
bpmn_subprocess
|
||||
)
|
||||
@ -666,19 +691,11 @@ class ProcessInstanceProcessor:
|
||||
] = single_bpmn_process_dict
|
||||
|
||||
tasks = TaskModel.query.filter(
|
||||
TaskModel.bpmn_process_id.in_(bpmn_subprocess_ids.keys()) # type: ignore
|
||||
TaskModel.bpmn_process_id.in_(bpmn_subprocess_id_to_guid_mappings.keys()) # type: ignore
|
||||
).all()
|
||||
for task in tasks:
|
||||
bpmn_subprocess_guid = bpmn_subprocess_ids[task.bpmn_process_id]
|
||||
json_data = JsonDataModel.query.filter_by(
|
||||
hash=task.json_data_hash
|
||||
).first()
|
||||
spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess_guid][
|
||||
"tasks"
|
||||
][task.guid] = task.properties_json
|
||||
spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess_guid][
|
||||
"tasks"
|
||||
][task.guid]["data"] = json_data.data
|
||||
cls._get_tasks_dict(
|
||||
tasks, spiff_bpmn_process_dict, bpmn_subprocess_id_to_guid_mappings
|
||||
)
|
||||
|
||||
return spiff_bpmn_process_dict
|
||||
|
||||
@ -1084,16 +1101,26 @@ class ProcessInstanceProcessor:
|
||||
self._add_bpmn_process_definitions(bpmn_spec_dict)
|
||||
|
||||
subprocesses = process_instance_data_dict.pop("subprocesses")
|
||||
bpmn_process_parent = TaskService.add_bpmn_process(
|
||||
process_instance_data_dict, self.process_instance_model
|
||||
bpmn_process_parent, new_task_models, new_json_data_models = (
|
||||
TaskService.add_bpmn_process(
|
||||
process_instance_data_dict, self.process_instance_model
|
||||
)
|
||||
)
|
||||
for subprocess_task_id, subprocess_properties in subprocesses.items():
|
||||
TaskService.add_bpmn_process(
|
||||
(
|
||||
_bpmn_subprocess,
|
||||
subprocess_new_task_models,
|
||||
subprocess_new_json_data_models,
|
||||
) = TaskService.add_bpmn_process(
|
||||
subprocess_properties,
|
||||
self.process_instance_model,
|
||||
bpmn_process_parent,
|
||||
bpmn_process_guid=subprocess_task_id,
|
||||
)
|
||||
new_task_models.update(subprocess_new_task_models)
|
||||
new_json_data_models.update(subprocess_new_json_data_models)
|
||||
db.session.bulk_save_objects(new_task_models.values())
|
||||
db.session.bulk_save_objects(new_json_data_models.values())
|
||||
|
||||
def save(self) -> None:
|
||||
"""Saves the current state of this processor to the database."""
|
||||
@ -1881,9 +1908,11 @@ class ProcessInstanceProcessor:
|
||||
db.session.add(details_model)
|
||||
# #######
|
||||
|
||||
TaskService.update_task_model_and_add_to_db_session(
|
||||
json_data = TaskService.update_task_model(
|
||||
task_model, spiff_task, self._serializer
|
||||
)
|
||||
if json_data is not None:
|
||||
db.session.add(json_data)
|
||||
|
||||
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
|
||||
self.save()
|
||||
|
@ -19,9 +19,10 @@ class TaskService:
|
||||
@classmethod
|
||||
def update_task_data_on_task_model(
|
||||
cls, task_model: TaskModel, task_data_dict: dict
|
||||
) -> None:
|
||||
) -> Optional[JsonDataModel]:
|
||||
task_data_json = json.dumps(task_data_dict, sort_keys=True)
|
||||
task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest()
|
||||
json_data_to_return = None
|
||||
if task_model.json_data_hash != task_data_hash:
|
||||
json_data = (
|
||||
db.session.query(JsonDataModel.id)
|
||||
@ -30,26 +31,28 @@ class TaskService:
|
||||
)
|
||||
if json_data is None:
|
||||
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
|
||||
db.session.add(json_data)
|
||||
json_data_to_return = json_data
|
||||
task_model.json_data_hash = task_data_hash
|
||||
return json_data_to_return
|
||||
|
||||
@classmethod
|
||||
def update_task_model_and_add_to_db_session(
|
||||
def update_task_model(
|
||||
cls,
|
||||
task_model: TaskModel,
|
||||
spiff_task: SpiffTask,
|
||||
serializer: BpmnWorkflowSerializer,
|
||||
) -> None:
|
||||
) -> Optional[JsonDataModel]:
|
||||
"""Updates properties_json and data on given task_model.
|
||||
|
||||
This will NOT update start_in_seconds or end_in_seconds.
|
||||
It also returns the relating json_data object so they can be imported later.
|
||||
"""
|
||||
new_properties_json = serializer.task_to_dict(spiff_task)
|
||||
spiff_task_data = new_properties_json.pop("data")
|
||||
task_model.properties_json = new_properties_json
|
||||
task_model.state = TaskStateNames[new_properties_json["state"]]
|
||||
cls.update_task_data_on_task_model(task_model, spiff_task_data)
|
||||
db.session.add(task_model)
|
||||
json_data = cls.update_task_data_on_task_model(task_model, spiff_task_data)
|
||||
return json_data
|
||||
|
||||
@classmethod
|
||||
def find_or_create_task_model_from_spiff_task(
|
||||
@ -57,13 +60,21 @@ class TaskService:
|
||||
spiff_task: SpiffTask,
|
||||
process_instance: ProcessInstanceModel,
|
||||
serializer: BpmnWorkflowSerializer,
|
||||
) -> TaskModel:
|
||||
) -> Tuple[
|
||||
Optional[BpmnProcessModel],
|
||||
TaskModel,
|
||||
dict[str, TaskModel],
|
||||
dict[str, JsonDataModel],
|
||||
]:
|
||||
spiff_task_guid = str(spiff_task.id)
|
||||
task_model: Optional[TaskModel] = TaskModel.query.filter_by(
|
||||
guid=spiff_task_guid
|
||||
).first()
|
||||
bpmn_process = None
|
||||
new_task_models: dict[str, TaskModel] = {}
|
||||
new_json_data_models: dict[str, JsonDataModel] = {}
|
||||
if task_model is None:
|
||||
bpmn_process = cls.task_bpmn_process(
|
||||
bpmn_process, new_task_models, new_json_data_models = cls.task_bpmn_process(
|
||||
spiff_task, process_instance, serializer
|
||||
)
|
||||
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
|
||||
@ -71,7 +82,7 @@ class TaskService:
|
||||
task_model = TaskModel(
|
||||
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
|
||||
)
|
||||
return task_model
|
||||
return (bpmn_process, task_model, new_task_models, new_json_data_models)
|
||||
|
||||
@classmethod
|
||||
def task_subprocess(
|
||||
@ -96,34 +107,38 @@ class TaskService:
|
||||
spiff_task: SpiffTask,
|
||||
process_instance: ProcessInstanceModel,
|
||||
serializer: BpmnWorkflowSerializer,
|
||||
) -> BpmnProcessModel:
|
||||
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]:
|
||||
subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
|
||||
bpmn_process: Optional[BpmnProcessModel] = None
|
||||
new_task_models: dict[str, TaskModel] = {}
|
||||
new_json_data_models: dict[str, JsonDataModel] = {}
|
||||
if subprocess is None:
|
||||
bpmn_process = process_instance.bpmn_process
|
||||
# This is the top level workflow, which has no guid
|
||||
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
|
||||
if process_instance.bpmn_process_id is None:
|
||||
bpmn_process = cls.add_bpmn_process(
|
||||
serializer.workflow_to_dict(
|
||||
spiff_task.workflow._get_outermost_workflow()
|
||||
),
|
||||
process_instance,
|
||||
bpmn_process, new_task_models, new_json_data_models = (
|
||||
cls.add_bpmn_process(
|
||||
serializer.workflow_to_dict(
|
||||
spiff_task.workflow._get_outermost_workflow()
|
||||
),
|
||||
process_instance,
|
||||
)
|
||||
)
|
||||
db.session.commit()
|
||||
else:
|
||||
bpmn_process = BpmnProcessModel.query.filter_by(
|
||||
guid=subprocess_guid
|
||||
).first()
|
||||
if bpmn_process is None:
|
||||
bpmn_process = cls.add_bpmn_process(
|
||||
serializer.workflow_to_dict(subprocess),
|
||||
process_instance,
|
||||
process_instance.bpmn_process,
|
||||
subprocess_guid,
|
||||
bpmn_process, new_task_models, new_json_data_models = (
|
||||
cls.add_bpmn_process(
|
||||
serializer.workflow_to_dict(subprocess),
|
||||
process_instance,
|
||||
process_instance.bpmn_process,
|
||||
subprocess_guid,
|
||||
)
|
||||
)
|
||||
db.session.commit()
|
||||
return bpmn_process
|
||||
return (bpmn_process, new_task_models, new_json_data_models)
|
||||
|
||||
@classmethod
|
||||
def add_bpmn_process(
|
||||
@ -132,10 +147,18 @@ class TaskService:
|
||||
process_instance: ProcessInstanceModel,
|
||||
bpmn_process_parent: Optional[BpmnProcessModel] = None,
|
||||
bpmn_process_guid: Optional[str] = None,
|
||||
) -> BpmnProcessModel:
|
||||
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]:
|
||||
"""This creates and adds a bpmn_process to the Db session.
|
||||
|
||||
It will also add tasks and relating json_data entries if the bpmn_process is new.
|
||||
It returns tasks and json data records in dictionaries to be added to the session later.
|
||||
"""
|
||||
tasks = bpmn_process_dict.pop("tasks")
|
||||
bpmn_process_data_dict = bpmn_process_dict.pop("data")
|
||||
|
||||
new_task_models = {}
|
||||
new_json_data_models = {}
|
||||
|
||||
bpmn_process = None
|
||||
if bpmn_process_parent is not None:
|
||||
bpmn_process = BpmnProcessModel.query.filter_by(
|
||||
@ -165,13 +188,16 @@ class TaskService:
|
||||
json_data = JsonDataModel(
|
||||
hash=bpmn_process_data_hash, data=bpmn_process_data_dict
|
||||
)
|
||||
db.session.add(json_data)
|
||||
new_json_data_models[bpmn_process_data_hash] = json_data
|
||||
bpmn_process.json_data_hash = bpmn_process_data_hash
|
||||
|
||||
if bpmn_process_parent is None:
|
||||
process_instance.bpmn_process = bpmn_process
|
||||
elif bpmn_process.parent_process_id is None:
|
||||
bpmn_process.parent_process_id = bpmn_process_parent.id
|
||||
|
||||
# Since we bulk insert tasks later we need to add the bpmn_process to the session
|
||||
# to ensure we have an id.
|
||||
db.session.add(bpmn_process)
|
||||
|
||||
if bpmn_process_is_new:
|
||||
@ -194,7 +220,11 @@ class TaskService:
|
||||
task_model.state = TaskStateNames[state_int]
|
||||
task_model.properties_json = task_properties
|
||||
|
||||
TaskService.update_task_data_on_task_model(task_model, task_data_dict)
|
||||
db.session.add(task_model)
|
||||
json_data = TaskService.update_task_data_on_task_model(
|
||||
task_model, task_data_dict
|
||||
)
|
||||
new_task_models[task_model.guid] = task_model
|
||||
if json_data is not None:
|
||||
new_json_data_models[json_data.hash] = json_data
|
||||
|
||||
return bpmn_process
|
||||
return (bpmn_process, new_task_models, new_json_data_models)
|
||||
|
@ -12,6 +12,7 @@ from SpiffWorkflow.task import TaskState
|
||||
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.json_data import JsonDataModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_instance_correlation import (
|
||||
MessageInstanceCorrelationRuleModel,
|
||||
@ -58,6 +59,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||
self.process_instance = process_instance
|
||||
|
||||
self.current_task_model: Optional[TaskModel] = None
|
||||
self.task_models: dict[str, TaskModel] = {}
|
||||
self.json_data_models: dict[str, JsonDataModel] = {}
|
||||
self.serializer = serializer
|
||||
|
||||
def should_update_task_model(self) -> bool:
|
||||
@ -66,15 +69,17 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||
Use the bpmn_process_id to do this.
|
||||
"""
|
||||
return self.process_instance.bpmn_process_id is not None
|
||||
# return False
|
||||
|
||||
def will_complete_task(self, spiff_task: SpiffTask) -> None:
|
||||
if self.should_update_task_model():
|
||||
self.current_task_model = (
|
||||
_bpmn_process, task_model, new_task_models, new_json_data_models = (
|
||||
TaskService.find_or_create_task_model_from_spiff_task(
|
||||
spiff_task, self.process_instance, self.serializer
|
||||
)
|
||||
)
|
||||
self.current_task_model = task_model
|
||||
self.task_models.update(new_task_models)
|
||||
self.json_data_models.update(new_json_data_models)
|
||||
self.current_task_model.start_in_seconds = time.time()
|
||||
if self.secondary_engine_step_delegate:
|
||||
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
|
||||
@ -82,14 +87,18 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||
def did_complete_task(self, spiff_task: SpiffTask) -> None:
|
||||
if self.current_task_model and self.should_update_task_model():
|
||||
self.current_task_model.end_in_seconds = time.time()
|
||||
TaskService.update_task_model_and_add_to_db_session(
|
||||
json_data = TaskService.update_task_model(
|
||||
self.current_task_model, spiff_task, self.serializer
|
||||
)
|
||||
db.session.add(self.current_task_model)
|
||||
if json_data is not None:
|
||||
self.json_data_models[json_data.hash] = json_data
|
||||
self.task_models[self.current_task_model.guid] = self.current_task_model
|
||||
if self.secondary_engine_step_delegate:
|
||||
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
|
||||
|
||||
def save(self, _commit: bool = True) -> None:
|
||||
db.session.bulk_save_objects(self.task_models.values())
|
||||
db.session.bulk_save_objects(self.json_data_models.values())
|
||||
if self.secondary_engine_step_delegate:
|
||||
self.secondary_engine_step_delegate.save(commit=False)
|
||||
db.session.commit()
|
||||
@ -104,17 +113,19 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
||||
| TaskState.MAYBE
|
||||
| TaskState.LIKELY
|
||||
):
|
||||
task_model = TaskModel.query.filter_by(
|
||||
guid=str(waiting_spiff_task.id)
|
||||
).first()
|
||||
if task_model is None:
|
||||
task_model = TaskService.find_or_create_task_model_from_spiff_task(
|
||||
_bpmn_process, task_model, new_task_models, new_json_data_models = (
|
||||
TaskService.find_or_create_task_model_from_spiff_task(
|
||||
waiting_spiff_task, self.process_instance, self.serializer
|
||||
)
|
||||
TaskService.update_task_model_and_add_to_db_session(
|
||||
)
|
||||
self.task_models.update(new_task_models)
|
||||
self.json_data_models.update(new_json_data_models)
|
||||
json_data = TaskService.update_task_model(
|
||||
task_model, waiting_spiff_task, self.serializer
|
||||
)
|
||||
db.session.commit()
|
||||
self.task_models[task_model.guid] = task_model
|
||||
if json_data is not None:
|
||||
self.json_data_models[json_data.hash] = json_data
|
||||
|
||||
|
||||
class StepDetailLoggingDelegate(EngineStepDelegate):
|
||||
|
Loading…
x
Reference in New Issue
Block a user