Data store background fix (#1455)

* added test to demonstrate data store issue when run in certain contexts w/ burnettk

* added method to add kkv data store getters back onto tasks if appropriate w/ burnettk

* removed debug code w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-04-26 14:14:26 +00:00 committed by GitHub
parent c6f0dd65e5
commit 00c2e85fee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 156 additions and 18 deletions

View File

@ -1,17 +1,21 @@
import sys
from spiffworkflow_backend import create_app
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
def main() -> None:
def main(process_instance_id: str) -> None:
app = create_app()
with app.app_context():
execution_strategy_name = app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"]
process_instance = ProcessInstanceModel.query.filter_by(id=2).first()
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
ProcessInstanceService.run_process_instance_with_processor(
process_instance, execution_strategy_name=execution_strategy_name
)
if __name__ == "__main__":
main()
if len(sys.argv) < 2:
raise Exception("Process instance id not supplied")
main(sys.argv[1])

View File

@ -2,27 +2,53 @@ import json
import sys
from spiffworkflow_backend import create_app
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
def main(process_model_identifier: str, filepath: str) -> None:
def main(process_model_identifier: str, filepath: str, process_instance_id: int | None = None) -> None:
app = create_app()
with app.app_context():
user = UserService.find_or_create_system_user()
process_model = ProcessModelService.get_process_model(process_model_identifier.replace(":", "/"))
process_instance, _ = ProcessInstanceService.create_process_instance(process_model, user)
if process_instance_id is not None:
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
else:
process_instance, _ = ProcessInstanceService.create_process_instance(process_model, user)
if process_instance.process_model_identifier != process_model.id:
raise Exception(
f"Process model identifier '{process_model_identifier}' was passed in but process instance "
f"{process_instance.id} has '{process_instance.process_model_identifier}'"
)
with open(filepath) as f:
bpmn_process_json = f.read()
bpmn_process_dict = json.loads(bpmn_process_json)
if process_instance_id is None:
task_guid_sample = list(bpmn_process_dict["tasks"].keys())[0]
task_model = TaskModel.query.filter_by(guid=task_guid_sample).first()
if task_model is not None:
raise Exception(
"Tasks already exist in database for given json. "
"If you want to reset a process_instance then please pass in its id."
)
ProcessInstanceProcessor.persist_bpmn_process_dict(
bpmn_process_dict, bpmn_definition_to_task_definitions_mappings={}, process_instance_model=process_instance
)
print(process_instance.id)
if len(sys.argv) < 3:
raise Exception("usage: [script] [process_model_identifier] [bpmn_json_file_path]")
main(sys.argv[1], sys.argv[2])
process_instance_id: int | None = None
if len(sys.argv) > 3:
process_instance_id = int(sys.argv[3])
main(sys.argv[1], sys.argv[2], process_instance_id)

View File

@ -14,7 +14,6 @@ class Version4(DataMigrationBase):
@classmethod
def run(cls, process_instance: ProcessInstanceModel) -> None:
# return None
try:
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True

View File

@ -14,8 +14,6 @@ from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryM
class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
"""KKVDataStore."""
@staticmethod
def create_instance(identifier: str, location: str) -> Any:
return KKVDataStoreModel(
@ -68,9 +66,20 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
"data": data,
}
def get(self, my_task: SpiffTask) -> None:
"""get."""
@classmethod
def add_data_store_getters_to_spiff_task(cls, spiff_task: SpiffTask) -> None:
"""Adds the data store getters onto the task if necessary.
This is because the getters are methods and methods are stripped out of task data when we serialize.
These methods are added to task data when the task is marked as READY and therefore may not be there
when the task actually runs.
"""
data_input_associations = spiff_task.task_spec.data_input_associations
for dia in data_input_associations:
if isinstance(dia, KKVDataStore):
dia.get(spiff_task)
def get(self, my_task: SpiffTask) -> None:
def getter(top_level_key: str, secondary_key: str | None) -> Any | None:
location = self.data_store_location_for_task(KKVDataStoreModel, my_task, self.bpmn_id)
store_model: KKVDataStoreModel | None = None
@ -105,7 +114,6 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
my_task.data[self.bpmn_id] = getter
def set(self, my_task: SpiffTask) -> None:
"""set."""
location = self.data_store_location_for_task(KKVDataStoreModel, my_task, self.bpmn_id)
store_model: KKVDataStoreModel | None = None
@ -175,10 +183,7 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
class KKVDataStoreConverter(BpmnConverter): # type: ignore
"""KKVDataStoreConverter."""
def to_dict(self, spec: Any) -> dict[str, Any]:
"""to_dict."""
return {
"bpmn_id": spec.bpmn_id,
"bpmn_name": spec.bpmn_name,
@ -187,5 +192,4 @@ class KKVDataStoreConverter(BpmnConverter): # type: ignore
}
def from_dict(self, dct: dict[str, Any]) -> KKVDataStore:
"""from_dict."""
return KKVDataStore(**dct)

View File

@ -23,6 +23,7 @@ from SpiffWorkflow.util.task import TaskState # type: ignore
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum
from spiffworkflow_backend.models.db import db
@ -220,6 +221,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.spiff_task_timestamps[spiff_task.id] = {"start_in_seconds": time.time(), "end_in_seconds": None}
self.current_task_start_in_seconds = time.time()
KKVDataStore.add_data_store_getters_to_spiff_task(spiff_task)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_data_store_stuff_knia47k" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_17db3yp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_17db3yp" sourceRef="StartEvent_1" targetRef="the_script_task" />
<bpmn:endEvent id="EndEvent_1">
<bpmn:incoming>Flow_07mzusm</bpmn:incoming>
</bpmn:endEvent>
<bpmn:scriptTask id="the_script_task" name="Script Task">
<bpmn:extensionElements>
<spiffworkflow:instructionsForEndUser># WE INSTRUCT</spiffworkflow:instructionsForEndUser>
</bpmn:extensionElements>
<bpmn:incoming>Flow_17db3yp</bpmn:incoming>
<bpmn:outgoing>Flow_07mzusm</bpmn:outgoing>
<bpmn:property id="Property_0uw7py5" name="__targetRef_placeholder" />
<bpmn:dataInputAssociation id="DataInputAssociation_1sbg045">
<bpmn:sourceRef>DataStoreReference_0f7ad7d</bpmn:sourceRef>
<bpmn:targetRef>Property_0uw7py5</bpmn:targetRef>
</bpmn:dataInputAssociation>
<bpmn:script>hey = the_id</bpmn:script>
</bpmn:scriptTask>
<bpmn:dataStoreReference id="DataStoreReference_0f7ad7d" name="test test kkv" dataStoreRef="the_id" type="kkv" />
<bpmn:sequenceFlow id="Flow_07mzusm" sourceRef="the_script_task" targetRef="EndEvent_1" />
</bpmn:process>
<bpmn:dataStore id="the_id" name="KKVDataStore" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_data_store_stuff_knia47k">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="-38" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0ld5057_di" bpmnElement="the_script_task">
<dc:Bounds x="20" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="DataStoreReference_0f7ad7d_di" bpmnElement="DataStoreReference_0f7ad7d">
<dc:Bounds x="35" y="35" width="50" height="50" />
<bpmndi:BPMNLabel>
<dc:Bounds x="31" y="92" width="59" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_14za570_di" bpmnElement="EndEvent_1">
<dc:Bounds x="172" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_17db3yp_di" bpmnElement="Flow_17db3yp">
<di:waypoint x="-2" y="177" />
<di:waypoint x="20" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="DataInputAssociation_1sbg045_di" bpmnElement="DataInputAssociation_1sbg045">
<di:waypoint x="62" y="85" />
<di:waypoint x="67" y="137" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_07mzusm_di" bpmnElement="Flow_07mzusm">
<di:waypoint x="120" y="177" />
<di:waypoint x="172" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -4,12 +4,16 @@ from typing import Any
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@dataclass
@ -42,7 +46,7 @@ def with_key1_key2_record(with_clean_data_store: KKVDataStoreModel) -> Generator
yield with_clean_data_store
class TestKKVDataStore(BaseTest):
class TestKkvDataStore(BaseTest):
"""Infer from class name."""
def _entry_count(self, model: KKVDataStoreModel) -> int:
@ -159,3 +163,41 @@ class TestKKVDataStore(BaseTest):
kkv_data_store.get(my_task)
result = my_task.data["the_id"]("newKey3", "newKey4")
assert result == "newValue2"
def test_can_retrieve_data_store_from_script_task(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_clean_data_store: KKVDataStoreModel
) -> None:
process_model_identifier = "simple_data_store"
bpmn_file_location = "data_store_simple"
bpmn_file_name = "data-store-simple.bpmn"
process_model = load_test_spec(
process_model_id=process_model_identifier,
bpmn_file_name=bpmn_file_name,
process_model_source_directory=bpmn_file_location,
)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True, execution_strategy_name="greedy")
assert process_instance.status == "complete"
def test_can_retrieve_data_store_from_script_task_with_instructions(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None, with_clean_data_store: KKVDataStoreModel
) -> None:
process_model_identifier = "simple_data_store"
bpmn_file_location = "data_store_simple"
bpmn_file_name = "data-store-simple.bpmn"
process_model = load_test_spec(
process_model_id=process_model_identifier,
bpmn_file_name=bpmn_file_name,
process_model_source_directory=bpmn_file_location,
)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True, execution_strategy_name="run_until_user_message")
assert process_instance.status == "running"
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
assert process_instance is not None
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True, execution_strategy_name="greedy")
assert process_instance.status == "complete"