our main test is passing w/ burnettk

This commit is contained in:
jasquat 2023-03-13 13:49:55 -04:00
parent 3ee866595d
commit 203b16ed5b
No known key found for this signature in database
5 changed files with 61 additions and 37 deletions

View File

@ -2,6 +2,7 @@
select = B,B9,C,D,DAR,E,F,N,RST,S,W select = B,B9,C,D,DAR,E,F,N,RST,S,W
ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320 ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320
max-line-length = 120 max-line-length = 120
extend-ignore = E203
max-complexity = 30 max-complexity = 30
docstring-convention = google docstring-convention = google
rst-roles = class,const,func,meth,mod,ref rst-roles = class,const,func,meth,mod,ref

View File

@ -2,6 +2,7 @@
select = B,B9,C,D,DAR,E,F,N,RST,S,W select = B,B9,C,D,DAR,E,F,N,RST,S,W
ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320 ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320
max-line-length = 120 max-line-length = 120
extend-ignore = E203
max-complexity = 30 max-complexity = 30
docstring-convention = google docstring-convention = google
rst-roles = class,const,func,meth,mod,ref rst-roles = class,const,func,meth,mod,ref

View File

@ -1084,16 +1084,20 @@ class ProcessInstanceProcessor:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = TaskService.add_bpmn_process( bpmn_process_parent, new_task_models, new_json_data_models = TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model process_instance_data_dict, self.process_instance_model
) )
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
TaskService.add_bpmn_process( _bpmn_subprocess, subprocess_new_task_models, subprocess_new_json_data_models = TaskService.add_bpmn_process(
subprocess_properties, subprocess_properties,
self.process_instance_model, self.process_instance_model,
bpmn_process_parent, bpmn_process_parent,
bpmn_process_guid=subprocess_task_id, bpmn_process_guid=subprocess_task_id,
) )
new_task_models.update(subprocess_new_task_models)
new_json_data_models.update(subprocess_new_json_data_models)
db.session.bulk_save_objects(new_task_models.values())
db.session.bulk_save_objects(new_json_data_models.values())
def save(self) -> None: def save(self) -> None:
"""Saves the current state of this processor to the database.""" """Saves the current state of this processor to the database."""
@ -1881,9 +1885,11 @@ class ProcessInstanceProcessor:
db.session.add(details_model) db.session.add(details_model)
# ####### # #######
TaskService.update_task_model_and_add_to_db_session( json_data = TaskService.update_task_model_and_add_to_db_session(
task_model, spiff_task, self._serializer task_model, spiff_task, self._serializer
) )
if json_data is not None:
db.session.add(json_data)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save() self.save()

View File

@ -19,9 +19,10 @@ class TaskService:
@classmethod @classmethod
def update_task_data_on_task_model( def update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict cls, task_model: TaskModel, task_data_dict: dict
) -> None: ) -> Optional[JsonDataModel]:
task_data_json = json.dumps(task_data_dict, sort_keys=True) task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_to_return = None
if task_model.json_data_hash != task_data_hash: if task_model.json_data_hash != task_data_hash:
json_data = ( json_data = (
db.session.query(JsonDataModel.id) db.session.query(JsonDataModel.id)
@ -30,8 +31,9 @@ class TaskService:
) )
if json_data is None: if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data) json_data_to_return = json_data
task_model.json_data_hash = task_data_hash task_model.json_data_hash = task_data_hash
return json_data_to_return
@classmethod @classmethod
def update_task_model_and_add_to_db_session( def update_task_model_and_add_to_db_session(
@ -39,7 +41,7 @@ class TaskService:
task_model: TaskModel, task_model: TaskModel,
spiff_task: SpiffTask, spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> None: ) -> Optional[JsonDataModel]:
"""Updates properties_json and data on given task_model. """Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds. This will NOT update start_in_seconds or end_in_seconds.
@ -48,8 +50,8 @@ class TaskService:
spiff_task_data = new_properties_json.pop("data") spiff_task_data = new_properties_json.pop("data")
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]] task_model.state = TaskStateNames[new_properties_json["state"]]
cls.update_task_data_on_task_model(task_model, spiff_task_data) json_data = cls.update_task_data_on_task_model(task_model, spiff_task_data)
db.session.add(task_model) return json_data
@classmethod @classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
@ -57,13 +59,16 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> TaskModel: ) -> Tuple[Optional[BpmnProcessModel], TaskModel, dict[str, TaskModel], dict[str, JsonDataModel]]:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by( task_model: Optional[TaskModel] = TaskModel.query.filter_by(
guid=spiff_task_guid guid=spiff_task_guid
).first() ).first()
bpmn_process = None
new_task_models: dict[str, TaskModel] = {}
new_json_data_models: dict[str, JsonDataModel] = {}
if task_model is None: if task_model is None:
bpmn_process = cls.task_bpmn_process( bpmn_process, new_task_models, new_json_data_models = cls.task_bpmn_process(
spiff_task, process_instance, serializer spiff_task, process_instance, serializer
) )
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
@ -71,7 +76,7 @@ class TaskService:
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
) )
return task_model return (bpmn_process, task_model, new_task_models, new_json_data_models)
@classmethod @classmethod
def task_subprocess( def task_subprocess(
@ -96,34 +101,34 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> BpmnProcessModel: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None bpmn_process: Optional[BpmnProcessModel] = None
new_task_models: dict[str, TaskModel] = {}
new_json_data_models: dict[str, JsonDataModel] = {}
if subprocess is None: if subprocess is None:
bpmn_process = process_instance.bpmn_process bpmn_process = process_instance.bpmn_process
# This is the top level workflow, which has no guid # This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None: if process_instance.bpmn_process_id is None:
bpmn_process = cls.add_bpmn_process( bpmn_process, new_task_models, new_json_data_models = cls.add_bpmn_process(
serializer.workflow_to_dict( serializer.workflow_to_dict(
spiff_task.workflow._get_outermost_workflow() spiff_task.workflow._get_outermost_workflow()
), ),
process_instance, process_instance,
) )
db.session.commit()
else: else:
bpmn_process = BpmnProcessModel.query.filter_by( bpmn_process = BpmnProcessModel.query.filter_by(
guid=subprocess_guid guid=subprocess_guid
).first() ).first()
if bpmn_process is None: if bpmn_process is None:
bpmn_process = cls.add_bpmn_process( bpmn_process, new_task_models, new_json_data_models = cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess), serializer.workflow_to_dict(subprocess),
process_instance, process_instance,
process_instance.bpmn_process, process_instance.bpmn_process,
subprocess_guid, subprocess_guid,
) )
db.session.commit() return (bpmn_process, new_task_models, new_json_data_models)
return bpmn_process
@classmethod @classmethod
def add_bpmn_process( def add_bpmn_process(
@ -132,7 +137,7 @@ class TaskService:
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataModel]]:
tasks = bpmn_process_dict.pop("tasks") tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data_dict = bpmn_process_dict.pop("data") bpmn_process_data_dict = bpmn_process_dict.pop("data")
@ -174,6 +179,8 @@ class TaskService:
bpmn_process.parent_process_id = bpmn_process_parent.id bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process) db.session.add(bpmn_process)
new_task_models = {}
new_json_data_models = {}
if bpmn_process_is_new: if bpmn_process_is_new:
for task_id, task_properties in tasks.items(): for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data") task_data_dict = task_properties.pop("data")
@ -194,7 +201,9 @@ class TaskService:
task_model.state = TaskStateNames[state_int] task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties task_model.properties_json = task_properties
TaskService.update_task_data_on_task_model(task_model, task_data_dict) json_data = TaskService.update_task_data_on_task_model(task_model, task_data_dict)
db.session.add(task_model) new_task_models[task_model.guid] = task_model
if json_data is not None:
new_json_data_models[json_data.hash] = json_data
return bpmn_process return (bpmn_process, new_task_models, new_json_data_models)

View File

@ -12,6 +12,7 @@ from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel, MessageInstanceCorrelationRuleModel,
@ -58,6 +59,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.process_instance = process_instance self.process_instance = process_instance
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.task_models: dict[str, TaskModel] = {}
self.json_data_models: dict[str, JsonDataModel] = {}
self.serializer = serializer self.serializer = serializer
def should_update_task_model(self) -> bool: def should_update_task_model(self) -> bool:
@ -66,15 +69,15 @@ class TaskModelSavingDelegate(EngineStepDelegate):
Use the bpmn_process_id to do this. Use the bpmn_process_id to do this.
""" """
return self.process_instance.bpmn_process_id is not None return self.process_instance.bpmn_process_id is not None
# return False
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model(): if self.should_update_task_model():
self.current_task_model = ( _bpmn_process, task_model, new_task_models, new_json_data_models = TaskService.find_or_create_task_model_from_spiff_task(
TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer spiff_task, self.process_instance, self.serializer
) )
) self.current_task_model = task_model
self.task_models.update(new_task_models)
self.json_data_models.update(new_json_data_models)
self.current_task_model.start_in_seconds = time.time() self.current_task_model.start_in_seconds = time.time()
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task) self.secondary_engine_step_delegate.will_complete_task(spiff_task)
@ -82,14 +85,18 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def did_complete_task(self, spiff_task: SpiffTask) -> None: def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model(): if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time() self.current_task_model.end_in_seconds = time.time()
TaskService.update_task_model_and_add_to_db_session( json_data = TaskService.update_task_model_and_add_to_db_session(
self.current_task_model, spiff_task, self.serializer self.current_task_model, spiff_task, self.serializer
) )
db.session.add(self.current_task_model) if json_data is not None:
self.json_data_models[json_data.hash] = json_data
self.task_models[self.current_task_model.guid] = self.current_task_model
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task) self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, _commit: bool = True) -> None: def save(self, _commit: bool = True) -> None:
db.session.bulk_save_objects(self.task_models.values())
db.session.bulk_save_objects(self.json_data_models.values())
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(commit=False) self.secondary_engine_step_delegate.save(commit=False)
db.session.commit() db.session.commit()
@ -104,17 +111,17 @@ class TaskModelSavingDelegate(EngineStepDelegate):
| TaskState.MAYBE | TaskState.MAYBE
| TaskState.LIKELY | TaskState.LIKELY
): ):
task_model = TaskModel.query.filter_by( _bpmn_process, task_model, new_task_models, new_json_data_models = TaskService.find_or_create_task_model_from_spiff_task(
guid=str(waiting_spiff_task.id)
).first()
if task_model is None:
task_model = TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer waiting_spiff_task, self.process_instance, self.serializer
) )
TaskService.update_task_model_and_add_to_db_session( self.task_models.update(new_task_models)
self.json_data_models.update(new_json_data_models)
json_data = TaskService.update_task_model_and_add_to_db_session(
task_model, waiting_spiff_task, self.serializer task_model, waiting_spiff_task, self.serializer
) )
db.session.commit() self.task_models[task_model.guid] = task_model
if json_data is not None:
self.json_data_models[json_data.hash] = json_data
class StepDetailLoggingDelegate(EngineStepDelegate): class StepDetailLoggingDelegate(EngineStepDelegate):