KKV Revamp (#968)

This commit is contained in:
jbirddog 2024-02-07 09:01:52 -05:00 committed by GitHub
parent 5ae59367cb
commit 305a6fb67a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 407 additions and 123 deletions

View File

@ -0,0 +1,47 @@
"""empty message
Revision ID: 60c13a48d675
Revises: 29b261f5edf4
Create Date: 2024-02-01 08:43:01.666683
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '60c13a48d675'
down_revision = '29b261f5edf4'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
# with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
# batch_op.drop_index('ix_kkv_data_store_secondary_key')
# batch_op.drop_index('ix_kkv_data_store_top_level_key')
op.drop_table('kkv_data_store')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('kkv_data_store',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('top_level_key', mysql.VARCHAR(length=255), nullable=False),
sa.Column('secondary_key', mysql.VARCHAR(length=255), nullable=False),
sa.Column('value', mysql.JSON(), nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
batch_op.create_index('ix_kkv_data_store_top_level_key', ['top_level_key'], unique=False)
batch_op.create_index('ix_kkv_data_store_secondary_key', ['secondary_key'], unique=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,70 @@
"""empty message
Revision ID: 6344d90d20fa
Revises: 60c13a48d675
Create Date: 2024-02-02 08:33:49.669497
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6344d90d20fa'
down_revision = '60c13a48d675'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('kkv_data_store',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('identifier', sa.String(length=255), nullable=False),
sa.Column('location', sa.String(length=255), nullable=False),
sa.Column('schema', sa.JSON(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=False),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('identifier', 'location', name='_kkv_identifier_location_unique')
)
with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_kkv_data_store_identifier'), ['identifier'], unique=False)
batch_op.create_index(batch_op.f('ix_kkv_data_store_name'), ['name'], unique=False)
op.create_table('kkv_data_store_entry',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('kkv_data_store_id', sa.Integer(), nullable=False),
sa.Column('top_level_key', sa.String(length=255), nullable=False),
sa.Column('secondary_key', sa.String(length=255), nullable=False),
sa.Column('value', sa.JSON(), nullable=False),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=False),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['kkv_data_store_id'], ['kkv_data_store.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('kkv_data_store_id', 'top_level_key', 'secondary_key', name='_instance_keys_unique')
)
with op.batch_alter_table('kkv_data_store_entry', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_kkv_data_store_entry_kkv_data_store_id'), ['kkv_data_store_id'], unique=False)
batch_op.create_index(batch_op.f('ix_kkv_data_store_entry_secondary_key'), ['secondary_key'], unique=False)
batch_op.create_index(batch_op.f('ix_kkv_data_store_entry_top_level_key'), ['top_level_key'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('kkv_data_store_entry', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_kkv_data_store_entry_top_level_key'))
batch_op.drop_index(batch_op.f('ix_kkv_data_store_entry_secondary_key'))
batch_op.drop_index(batch_op.f('ix_kkv_data_store_entry_kkv_data_store_id'))
op.drop_table('kkv_data_store_entry')
with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_kkv_data_store_name'))
batch_op.drop_index(batch_op.f('ix_kkv_data_store_identifier'))
op.drop_table('kkv_data_store')
# ### end Alembic commands ###

View File

@ -2821,7 +2821,7 @@ paths:
responses: responses:
"200": "200":
description: The list of currently defined data store types description: The list of currently defined data store types
/data-stores/{data_store_type}/{name}/items: /data-stores/{data_store_type}/{identifier}/items:
parameters: parameters:
- name: data_store_type - name: data_store_type
in: path in: path
@ -2829,10 +2829,16 @@ paths:
description: The type of datastore, such as "typeahead" description: The type of datastore, such as "typeahead"
schema: schema:
type: string type: string
- name: name - name: identifier
in: path in: path
required: true required: true
description: The name of the datastore, such as "cities" description: The identifier of the datastore, such as "cities"
schema:
type: string
- name: location
in: query
required: false
description: The location of the data store
schema: schema:
type: string type: string
- name: page - name: page

View File

@ -1,5 +1,18 @@
from typing import Any from typing import Any
from flask import current_app
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend.services.upsearch_service import UpsearchService
class DataStoreReadError(Exception):
pass
class DataStoreWriteError(Exception):
pass
class DataStoreCRUD: class DataStoreCRUD:
@staticmethod @staticmethod
@ -33,3 +46,29 @@ class DataStoreCRUD:
@staticmethod @staticmethod
def delete_record(name: str, data: dict[str, Any]) -> None: def delete_record(name: str, data: dict[str, Any]) -> None:
raise Exception("must implement") raise Exception("must implement")
@staticmethod
def process_model_location_for_task(spiff_task: SpiffTask) -> str | None:
tld = current_app.config.get("THREAD_LOCAL_DATA")
if tld and hasattr(tld, "process_model_identifier"):
return tld.process_model_identifier # type: ignore
return None
@classmethod
def data_store_location_for_task(cls, model: Any, spiff_task: SpiffTask, identifier: str) -> str | None:
location = cls.process_model_location_for_task(spiff_task)
if location is None:
return None
locations = UpsearchService.upsearch_locations(location)
model = (
model.query.filter_by(identifier=identifier)
.filter(model.location.in_(locations))
.order_by(model.location.desc())
.first()
)
if model is None:
return None
return model.location # type: ignore

View File

@ -1,32 +1,17 @@
from typing import Any from typing import Any
import jsonschema # type: ignore import jsonschema # type: ignore
from flask import current_app
from SpiffWorkflow.bpmn.serializer.helpers import BpmnConverter # type: ignore from SpiffWorkflow.bpmn.serializer.helpers import BpmnConverter # type: ignore
from SpiffWorkflow.bpmn.specs.data_spec import BpmnDataStoreSpecification # type: ignore from SpiffWorkflow.bpmn.specs.data_spec import BpmnDataStoreSpecification # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend.data_stores.crud import DataStoreCRUD from spiffworkflow_backend.data_stores.crud import DataStoreCRUD
from spiffworkflow_backend.data_stores.crud import DataStoreReadError
from spiffworkflow_backend.data_stores.crud import DataStoreWriteError
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data_store import JSONDataStoreModel from spiffworkflow_backend.models.json_data_store import JSONDataStoreModel
from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.reference_cache_service import ReferenceCacheService from spiffworkflow_backend.services.reference_cache_service import ReferenceCacheService
from spiffworkflow_backend.services.upsearch_service import UpsearchService
class DataStoreReadError(Exception):
pass
class DataStoreWriteError(Exception):
pass
def _process_model_location_for_task(spiff_task: SpiffTask) -> str | None:
tld = current_app.config.get("THREAD_LOCAL_DATA")
if tld and hasattr(tld, "process_model_identifier"):
return tld.process_model_identifier # type: ignore
return None
class JSONDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore class JSONDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
@ -48,12 +33,12 @@ class JSONDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
def existing_data_stores(process_group_identifier: str | None = None) -> list[dict[str, Any]]: def existing_data_stores(process_group_identifier: str | None = None) -> list[dict[str, Any]]:
data_stores = [] data_stores = []
query = db.session.query(JSONDataStoreModel.name, JSONDataStoreModel.identifier) query = db.session.query(JSONDataStoreModel.name, JSONDataStoreModel.identifier, JSONDataStoreModel.location)
if process_group_identifier is not None: if process_group_identifier is not None:
query = query.filter_by(location=process_group_identifier) query = query.filter_by(location=process_group_identifier)
keys = query.distinct().order_by(JSONDataStoreModel.name).all() # type: ignore keys = query.order_by(JSONDataStoreModel.name).all()
for key in keys: for key in keys:
data_stores.append({"name": key[0], "type": "json", "id": key[1], "clz": "JSONDataStore"}) data_stores.append({"name": key[0], "type": "json", "id": key[1], "clz": "JSONDataStore", "location": key[2]})
return data_stores return data_stores
@ -68,26 +53,28 @@ class JSONDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
@staticmethod @staticmethod
def build_response_item(model: Any) -> dict[str, Any]: def build_response_item(model: Any) -> dict[str, Any]:
return {"location": model.location, "identifier": model.identifier, "data": model.data} return {"data": model.data}
def get(self, my_task: SpiffTask) -> None: def get(self, my_task: SpiffTask) -> None:
"""get.""" """get."""
model: JSONDataStoreModel | None = None model: JSONDataStoreModel | None = None
location = self._data_store_location_for_task(my_task, self.bpmn_id) location = self.data_store_location_for_task(JSONDataStoreModel, my_task, self.bpmn_id)
if location is not None: if location is not None:
model = db.session.query(JSONDataStoreModel).filter_by(identifier=self.bpmn_id, location=location).first() model = db.session.query(JSONDataStoreModel).filter_by(identifier=self.bpmn_id, location=location).first()
if model is None: if model is None:
raise DataStoreReadError(f"Unable to read from data store '{self.bpmn_id}' using location '{location}'.") raise DataStoreReadError(f"Unable to read from data store '{self.bpmn_id}' using location '{location}'.")
my_task.data[self.bpmn_id] = model.data my_task.data[self.bpmn_id] = model.data
def set(self, my_task: SpiffTask) -> None: def set(self, my_task: SpiffTask) -> None:
"""set.""" """set."""
model: JSONDataStoreModel | None = None model: JSONDataStoreModel | None = None
location = self._data_store_location_for_task(my_task, self.bpmn_id) location = self.data_store_location_for_task(JSONDataStoreModel, my_task, self.bpmn_id)
if location is not None: if location is not None:
model = JSONDataStoreModel.query.filter_by(identifier=self.bpmn_id, location=location).first() model = JSONDataStoreModel.query.filter_by(identifier=self.bpmn_id, location=location).first()
if location is None or model is None: if model is None:
raise DataStoreWriteError(f"Unable to write to data store '{self.bpmn_id}' using location '{location}'.") raise DataStoreWriteError(f"Unable to write to data store '{self.bpmn_id}' using location '{location}'.")
data = my_task.data[self.bpmn_id] data = my_task.data[self.bpmn_id]
@ -105,24 +92,6 @@ class JSONDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
db.session.commit() db.session.commit()
del my_task.data[self.bpmn_id] del my_task.data[self.bpmn_id]
def _data_store_location_for_task(self, spiff_task: SpiffTask, identifier: str) -> str | None:
location = _process_model_location_for_task(spiff_task)
if location is None:
return None
locations = UpsearchService.upsearch_locations(location)
model = (
JSONDataStoreModel.query.filter_by(identifier=identifier)
.filter(JSONDataStoreModel.location.in_(locations)) # type: ignore
.order_by(JSONDataStoreModel.location.desc()) # type: ignore
.first()
)
if model is None:
return None
return model.location # type: ignore
@staticmethod @staticmethod
def register_data_store_class(data_store_classes: dict[str, Any]) -> None: def register_data_store_class(data_store_classes: dict[str, Any]) -> None:
data_store_classes["JSONDataStore"] = JSONDataStore data_store_classes["JSONDataStore"] = JSONDataStore
@ -166,7 +135,7 @@ class JSONFileDataStore(BpmnDataStoreSpecification): # type: ignore
del my_task.data[self.bpmn_id] del my_task.data[self.bpmn_id]
def _data_store_location_for_task(self, spiff_task: SpiffTask, identifier: str) -> str | None: def _data_store_location_for_task(self, spiff_task: SpiffTask, identifier: str) -> str | None:
location = _process_model_location_for_task(spiff_task) location = DataStoreCRUD.process_model_location_for_task(spiff_task)
if location is None: if location is None:
return None return None
if self._data_store_exists_at_location(location, identifier): if self._data_store_exists_at_location(location, identifier):

View File

@ -1,63 +1,92 @@
from typing import Any from typing import Any
import jsonschema # type: ignore
from SpiffWorkflow.bpmn.serializer.helpers import BpmnConverter # type: ignore from SpiffWorkflow.bpmn.serializer.helpers import BpmnConverter # type: ignore
from SpiffWorkflow.bpmn.specs.data_spec import BpmnDataStoreSpecification # type: ignore from SpiffWorkflow.bpmn.specs.data_spec import BpmnDataStoreSpecification # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend.data_stores.crud import DataStoreCRUD from spiffworkflow_backend.data_stores.crud import DataStoreCRUD
from spiffworkflow_backend.data_stores.crud import DataStoreReadError
from spiffworkflow_backend.data_stores.crud import DataStoreWriteError
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryModel
class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
"""KKVDataStore.""" """KKVDataStore."""
@staticmethod @staticmethod
def existing_data_stores(process_group_identifier: str | None = None) -> list[dict[str, Any]]: def create_instance(identifier: str, location: str) -> Any:
data_stores: list[dict[str, Any]] = [] return KKVDataStoreModel(
identifier=identifier,
if process_group_identifier is not None: location=location,
# temporary until this data store gets location support )
return data_stores
@staticmethod
keys = ( def existing_instance(identifier: str, location: str) -> Any:
db.session.query(KKVDataStoreModel.top_level_key) return db.session.query(KKVDataStoreModel).filter_by(identifier=identifier, location=location).first()
.distinct() # type: ignore
.order_by(KKVDataStoreModel.top_level_key) @staticmethod
.all() def existing_data_stores(process_group_identifier: str | None = None) -> list[dict[str, Any]]:
data_stores = []
query = db.session.query(KKVDataStoreModel)
if process_group_identifier is not None:
query = query.filter_by(location=process_group_identifier)
models = query.order_by(KKVDataStoreModel.name).all()
for model in models:
data_stores.append(
{"name": model.name, "type": "kkv", "id": model.identifier, "clz": "KKVDataStore", "location": model.location}
) )
for key in keys:
data_stores.append({"name": key[0], "type": "kkv", "id": "", "clz": "KKVDataStore"})
return data_stores return data_stores
@staticmethod @staticmethod
def get_data_store_query(name: str, process_group_identifier: str | None) -> Any: def get_data_store_query(identifier: str, process_group_identifier: str | None) -> Any:
return KKVDataStoreModel.query.filter_by(top_level_key=name).order_by( query = KKVDataStoreModel.query
KKVDataStoreModel.top_level_key, KKVDataStoreModel.secondary_key if process_group_identifier is not None:
) query = query.filter_by(identifier=identifier, location=process_group_identifier)
else:
query = query.filter_by(name=identifier)
return query.order_by(KKVDataStoreModel.name)
@staticmethod @staticmethod
def build_response_item(model: Any) -> dict[str, Any]: def build_response_item(model: Any) -> dict[str, Any]:
return { data = []
"secondary_key": model.secondary_key,
"value": model.value, for entry in model.entries:
data.append(
{
"top_level_key": entry.top_level_key,
"secondary_key": entry.secondary_key,
"value": entry.value,
} }
)
def _get_model(self, top_level_key: str, secondary_key: str) -> KKVDataStoreModel | None: return {
model = db.session.query(KKVDataStoreModel).filter_by(top_level_key=top_level_key, secondary_key=secondary_key).first() "data": data,
return model }
def _delete_all_for_top_level_key(self, top_level_key: str) -> None:
models = db.session.query(KKVDataStoreModel).filter_by(top_level_key=top_level_key).all()
for model in models:
db.session.delete(model)
def get(self, my_task: SpiffTask) -> None: def get(self, my_task: SpiffTask) -> None:
"""get.""" """get."""
def getter(top_level_key: str, secondary_key: str) -> Any | None: def getter(top_level_key: str, secondary_key: str) -> Any | None:
model = self._get_model(top_level_key, secondary_key) location = self.data_store_location_for_task(KKVDataStoreModel, my_task, self.bpmn_id)
store_model: KKVDataStoreModel | None = None
if location is not None:
store_model = db.session.query(KKVDataStoreModel).filter_by(identifier=self.bpmn_id, location=location).first()
if store_model is None:
raise DataStoreReadError(f"Unable to locate kkv data store '{self.bpmn_id}'.")
model = (
db.session.query(KKVDataStoreEntryModel)
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key, secondary_key=secondary_key)
.first()
)
if model is not None: if model is not None:
return model.value return model.value
return None return None
@ -66,29 +95,63 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
def set(self, my_task: SpiffTask) -> None: def set(self, my_task: SpiffTask) -> None:
"""set.""" """set."""
location = self.data_store_location_for_task(KKVDataStoreModel, my_task, self.bpmn_id)
store_model: KKVDataStoreModel | None = None
if location is not None:
store_model = db.session.query(KKVDataStoreModel).filter_by(identifier=self.bpmn_id, location=location).first()
if store_model is None:
raise DataStoreWriteError(f"Unable to locate kkv data store '{self.bpmn_id}'.")
data = my_task.data[self.bpmn_id] data = my_task.data[self.bpmn_id]
if not isinstance(data, dict): if not isinstance(data, dict):
raise Exception( raise DataStoreWriteError(
f"When writing to this data store, a dictionary is expected as the value for variable '{self.bpmn_id}'" f"When writing to this data store, a dictionary is expected as the value for variable '{self.bpmn_id}'"
) )
for top_level_key, second_level in data.items(): for top_level_key, second_level in data.items():
if second_level is None: if second_level is None:
self._delete_all_for_top_level_key(top_level_key) models = (
db.session.query(KKVDataStoreEntryModel)
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key)
.all()
)
for model_to_delete in models:
db.session.delete(model_to_delete)
continue continue
if not isinstance(second_level, dict): if not isinstance(second_level, dict):
raise Exception( raise DataStoreWriteError(
"When writing to this data store, a dictionary is expected as the value for" "When writing to this data store, a dictionary is expected as the value for"
f" '{self.bpmn_id}[\"{top_level_key}\"]'" f" '{self.bpmn_id}[\"{top_level_key}\"]'"
) )
for secondary_key, value in second_level.items(): for secondary_key, value in second_level.items():
model = self._get_model(top_level_key, secondary_key) model = (
db.session.query(KKVDataStoreEntryModel)
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key, secondary_key=secondary_key)
.first()
)
if model is None and value is None: if model is None and value is None:
continue continue
if value is None: if value is None:
db.session.delete(model) db.session.delete(model)
continue continue
try:
jsonschema.validate(instance=value, schema=store_model.schema)
except jsonschema.exceptions.ValidationError as e:
raise DataStoreWriteError(
f"Attempting to write data that does not match the provided schema for '{self.bpmn_id}': {e}"
) from e
if model is None: if model is None:
model = KKVDataStoreModel(top_level_key=top_level_key, secondary_key=secondary_key, value=value) model = KKVDataStoreEntryModel(
kkv_data_store_id=store_model.id,
top_level_key=top_level_key,
secondary_key=secondary_key,
value=value,
)
else: else:
model.value = value model.value = value
db.session.add(model) db.session.add(model)

View File

@ -91,6 +91,9 @@ from spiffworkflow_backend.models.json_data_store import (
from spiffworkflow_backend.models.kkv_data_store import ( from spiffworkflow_backend.models.kkv_data_store import (
KKVDataStoreModel, KKVDataStoreModel,
) # noqa: F401 ) # noqa: F401
from spiffworkflow_backend.models.kkv_data_store_entry import (
KKVDataStoreEntryModel,
) # noqa: F401
from spiffworkflow_backend.models.task_draft_data import ( from spiffworkflow_backend.models.task_draft_data import (
TaskDraftDataModel, TaskDraftDataModel,
) # noqa: F401 ) # noqa: F401

View File

@ -1,16 +1,28 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
if TYPE_CHECKING:
from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryModel # noqa: F401
@dataclass @dataclass
class KKVDataStoreModel(SpiffworkflowBaseDBModel): class KKVDataStoreModel(SpiffworkflowBaseDBModel):
__tablename__ = "kkv_data_store" __tablename__ = "kkv_data_store"
__table_args__ = (UniqueConstraint("identifier", "location", name="_kkv_identifier_location_unique"),)
id: int = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
top_level_key: str = db.Column(db.String(255), nullable=False, index=True) name: str = db.Column(db.String(255), index=True, nullable=False)
secondary_key: str = db.Column(db.String(255), nullable=False, index=True) identifier: str = db.Column(db.String(255), index=True, nullable=False)
value: dict = db.Column(db.JSON, nullable=False) location: str = db.Column(db.String(255), nullable=False)
schema: dict = db.Column(db.JSON, nullable=False)
description: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False) updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
created_at_in_seconds: int = db.Column(db.Integer, nullable=False) created_at_in_seconds: int = db.Column(db.Integer, nullable=False)
entries = relationship("KKVDataStoreEntryModel", cascade="delete")

View File

@ -0,0 +1,25 @@
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
@dataclass
class KKVDataStoreEntryModel(SpiffworkflowBaseDBModel):
__tablename__ = "kkv_data_store_entry"
__table_args__ = (UniqueConstraint("kkv_data_store_id", "top_level_key", "secondary_key", name="_instance_keys_unique"),)
id: int = db.Column(db.Integer, primary_key=True)
kkv_data_store_id: int = db.Column(ForeignKey(KKVDataStoreModel.id), nullable=False, index=True) # type: ignore
top_level_key: str = db.Column(db.String(255), nullable=False, index=True)
secondary_key: str = db.Column(db.String(255), nullable=False, index=True)
value: dict = db.Column(db.JSON, nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
created_at_in_seconds: int = db.Column(db.Integer, nullable=False)
instance = relationship("KKVDataStoreModel", back_populates="entries")

View File

@ -35,15 +35,19 @@ def data_store_list(process_group_identifier: str | None = None, page: int = 1,
def data_store_types() -> flask.wrappers.Response: def data_store_types() -> flask.wrappers.Response:
"""Returns a list of the types of available data stores.""" """Returns a list of the types of available data stores."""
# this if == "json" check is temporary while we roll out support for other data stores # this if != "typeahead" check is temporary while we roll out support for other data stores
# being created with locations, identifiers and schemas # being created with locations, identifiers and schemas
data_store_types = [{"type": k, "name": v[0].__name__, "description": v[1]} for k, v in DATA_STORES.items() if k == "json"] data_store_types = [
{"type": k, "name": v[0].__name__, "description": v[1]} for k, v in DATA_STORES.items() if k != "typeahead"
]
return make_response(jsonify(data_store_types), 200) return make_response(jsonify(data_store_types), 200)
def _build_response(data_store_class: Any, name: str, page: int, per_page: int) -> flask.wrappers.Response: def _build_response(
data_store_query = data_store_class.get_data_store_query(name, None) data_store_class: Any, identifier: str, location: str | None, page: int, per_page: int
) -> flask.wrappers.Response:
data_store_query = data_store_class.get_data_store_query(identifier, location)
data = data_store_query.paginate(page=page, per_page=per_page, error_out=False) data = data_store_query.paginate(page=page, per_page=per_page, error_out=False)
results = [] results = []
for item in data.items: for item in data.items:
@ -60,14 +64,16 @@ def _build_response(data_store_class: Any, name: str, page: int, per_page: int)
return make_response(jsonify(response_json), 200) return make_response(jsonify(response_json), 200)
def data_store_item_list(data_store_type: str, name: str, page: int = 1, per_page: int = 100) -> flask.wrappers.Response: def data_store_item_list(
data_store_type: str, identifier: str, location: str | None = None, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Returns a list of the items in a data store.""" """Returns a list of the items in a data store."""
if data_store_type not in DATA_STORES: if data_store_type not in DATA_STORES:
raise ApiError("unknown_data_store", f"Unknown data store type: {data_store_type}", status_code=400) raise ApiError("unknown_data_store", f"Unknown data store type: {data_store_type}", status_code=400)
data_store_class, _ = DATA_STORES[data_store_type] data_store_class, _ = DATA_STORES[data_store_type]
return _build_response(data_store_class, name, page, per_page) return _build_response(data_store_class, identifier, location, page, per_page)
def data_store_create(body: dict) -> flask.wrappers.Response: def data_store_create(body: dict) -> flask.wrappers.Response:

View File

@ -1,7 +1,9 @@
from typing import Any from typing import Any
from spiffworkflow_backend.data_stores.crud import DataStoreCRUD
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryModel
from spiffworkflow_backend.models.script_attributes_context import ScriptAttributesContext from spiffworkflow_backend.models.script_attributes_context import ScriptAttributesContext
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
@ -19,9 +21,23 @@ class CreateUniqueKKVTopLevelKey(Script):
) )
def run(self, script_attributes_context: ScriptAttributesContext, *args: Any, **kwargs: Any) -> Any: def run(self, script_attributes_context: ScriptAttributesContext, *args: Any, **kwargs: Any) -> Any:
top_level_key_prefix = args[0] identifier = args[0]
top_level_key_prefix = args[1]
spiff_task = script_attributes_context.task
location: str | None = None
model = KKVDataStoreModel(top_level_key="", secondary_key="", value={}) if identifier is not None and spiff_task is not None:
location = DataStoreCRUD.data_store_location_for_task(KKVDataStoreModel, spiff_task, identifier)
store_model: KKVDataStoreModel | None = None
if location is not None:
store_model = db.session.query(KKVDataStoreModel).filter_by(identifier=identifier, location=location).first()
if store_model is None:
raise Exception(f"Could not find KKV data store with the identifier '{identifier}'")
model = KKVDataStoreEntryModel(kkv_data_store_id=store_model.id, top_level_key="", secondary_key="", value={})
db.session.add(model) db.session.add(model)
db.session.commit() db.session.commit()

View File

@ -7,6 +7,7 @@ from flask.app import Flask
from spiffworkflow_backend.data_stores.kkv import KKVDataStore from spiffworkflow_backend.data_stores.kkv import KKVDataStore
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
from spiffworkflow_backend.models.kkv_data_store_entry import KKVDataStoreEntryModel
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -17,23 +18,36 @@ class MockTask:
@pytest.fixture() @pytest.fixture()
def with_clean_data_store(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]: def with_clean_data_store(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[KKVDataStoreModel, None, None]:
app.config["THREAD_LOCAL_DATA"].process_model_identifier = "the_location/of/some/process-model"
db.session.query(KKVDataStoreModel).delete() db.session.query(KKVDataStoreModel).delete()
db.session.commit() db.session.commit()
yield
model = KKVDataStoreModel(identifier="the_id", name="the_name", location="the_location", schema={})
db.session.add(model)
db.session.commit()
yield model
@pytest.fixture() @pytest.fixture()
def with_key1_key2_record(with_clean_data_store: None) -> Generator[None, None, None]: def with_key1_key2_record(with_clean_data_store: KKVDataStoreModel) -> Generator[KKVDataStoreModel, None, None]:
model = KKVDataStoreModel(top_level_key="key1", secondary_key="key2", value="value1") # type: ignore model = KKVDataStoreEntryModel(
kkv_data_store_id=with_clean_data_store.id, top_level_key="key1", secondary_key="key2", value={"key": "value"}
)
db.session.add(model) db.session.add(model)
db.session.commit() db.session.commit()
yield
yield with_clean_data_store
class TestKKVDataStore(BaseTest): class TestKKVDataStore(BaseTest):
"""Infer from class name.""" """Infer from class name."""
def _entry_count(self, model: KKVDataStoreModel) -> int:
return db.session.query(KKVDataStoreEntryModel).filter_by(kkv_data_store_id=model.id).count()
def test_returns_none_if_no_records_exist(self, with_clean_data_store: None) -> None: def test_returns_none_if_no_records_exist(self, with_clean_data_store: None) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={}) my_task = MockTask(data={})
@ -48,7 +62,7 @@ class TestKKVDataStore(BaseTest):
kkv_data_store.get(my_task) kkv_data_store.get(my_task)
assert len(my_task.data) == 1 assert len(my_task.data) == 1
result = my_task.data["the_id"]("key1", "key2") result = my_task.data["the_id"]("key1", "key2")
assert result == "value1" assert result == {"key": "value"}
def test_returns_none_if_first_key_does_not_match(self, with_key1_key2_record: None) -> None: def test_returns_none_if_first_key_does_not_match(self, with_key1_key2_record: None) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
@ -66,25 +80,25 @@ class TestKKVDataStore(BaseTest):
result = my_task.data["the_id"]("key1", "key22") result = my_task.data["the_id"]("key1", "key22")
assert result is None assert result is None
def test_can_insert_a_value(self, with_clean_data_store: None) -> None: def test_can_insert_a_value(self, with_clean_data_store: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 1 assert count == 1
def test_can_insert_mulitple_values_for_same_top_key(self, with_clean_data_store: None) -> None: def test_can_insert_multiple_values_for_same_top_key(self, with_clean_data_store: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 2 assert count == 2
def test_can_insert_mulitple_values_for_different_top_key(self, with_clean_data_store: None) -> None: def test_can_insert_multiple_values_for_different_top_key(self, with_clean_data_store: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 2 assert count == 2
def test_value_is_removed_from_task_data_after_insert(self, with_clean_data_store: None) -> None: def test_value_is_removed_from_task_data_after_insert(self, with_clean_data_store: None) -> None:
@ -103,44 +117,44 @@ class TestKKVDataStore(BaseTest):
result2 = my_task.data["the_id"]("newKey3", "newKey4") result2 = my_task.data["the_id"]("newKey3", "newKey4")
assert result2 == "newValue2" assert result2 == "newValue2"
def test_can_update_a_value(self, with_clean_data_store: None) -> None: def test_can_update_a_value(self, with_clean_data_store: KKVDataStore) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
my_task.data = {"the_id": {"newKey1": {"newKey2": "newValue2"}}} my_task.data = {"the_id": {"newKey1": {"newKey2": "newValue2"}}}
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 1 assert count == 1
kkv_data_store.get(my_task) kkv_data_store.get(my_task)
result2 = my_task.data["the_id"]("newKey1", "newKey2") result2 = my_task.data["the_id"]("newKey1", "newKey2")
assert result2 == "newValue2" assert result2 == "newValue2"
def test_can_delete_record_by_nulling_a_secondary_key(self, with_key1_key2_record: None) -> None: def test_can_delete_record_by_nulling_a_secondary_key(self, with_key1_key2_record: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"key1": {"key2": None}}}) my_task = MockTask(data={"the_id": {"key1": {"key2": None}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
kkv_data_store.get(my_task) kkv_data_store.get(my_task)
result = my_task.data["the_id"]("key1", "key2") result = my_task.data["the_id"]("key1", "key2")
assert result is None assert result is None
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_key1_key2_record)
assert count == 0 assert count == 0
def test_can_delete_all_records_by_nulling_a_top_level_key(self, with_clean_data_store: None) -> None: def test_can_delete_all_records_by_nulling_a_top_level_key(self, with_clean_data_store: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
my_task.data = {"the_id": {"newKey1": None}} my_task.data = {"the_id": {"newKey1": None}}
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 0 assert count == 0
def test_top_key_delete_does_not_delete_for_other_top_levels(self, with_clean_data_store: None) -> None: def test_top_key_delete_does_not_delete_for_other_top_levels(self, with_clean_data_store: KKVDataStoreModel) -> None:
kkv_data_store = KKVDataStore("the_id", "the_name") kkv_data_store = KKVDataStore("the_id", "the_name")
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}}) my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
my_task.data = {"the_id": {"newKey1": None}} my_task.data = {"the_id": {"newKey1": None}}
kkv_data_store.set(my_task) kkv_data_store.set(my_task)
count = db.session.query(KKVDataStoreModel).count() count = self._entry_count(with_clean_data_store)
assert count == 1 assert count == 1
kkv_data_store.get(my_task) kkv_data_store.get(my_task)
result = my_task.data["the_id"]("newKey3", "newKey4") result = my_task.data["the_id"]("newKey3", "newKey4")

View File

@ -37,21 +37,26 @@ export default function DataStoreListTable() {
'datastore' 'datastore'
); );
const dataStoreType = searchParams.get('type') || ''; const dataStoreType = searchParams.get('type') || '';
const dataStoreName = searchParams.get('name') || ''; const dataStoreIdentifier = searchParams.get('identifier') || '';
const dataStoreLocation = searchParams.get('location') || '';
if (dataStoreType === '' || dataStoreName === '') { if (dataStoreType === '' || dataStoreIdentifier === '') {
return; return;
} }
if (dataStores && dataStoreName && dataStoreType) { if (dataStores && dataStoreIdentifier && dataStoreType) {
dataStores.forEach((ds) => { dataStores.forEach((ds) => {
if (ds.name === dataStoreName && ds.type === dataStoreType) { if (
ds.id === dataStoreIdentifier &&
ds.type === dataStoreType &&
ds.location === dataStoreLocation
) {
setDataStore(ds); setDataStore(ds);
} }
}); });
} }
const queryParamString = `per_page=${perPage}&page=${page}`; const queryParamString = `per_page=${perPage}&page=${page}&location=${dataStoreLocation}`;
HttpService.makeCallToBackend({ HttpService.makeCallToBackend({
path: `/data-stores/${dataStoreType}/${dataStoreName}/items?${queryParamString}`, path: `/data-stores/${dataStoreType}/${dataStoreIdentifier}/items?${queryParamString}`,
successCallback: (response: DataStoreRecords) => { successCallback: (response: DataStoreRecords) => {
setResults(response.results); setResults(response.results);
setPagination(response.pagination); setPagination(response.pagination);
@ -101,6 +106,10 @@ export default function DataStoreListTable() {
); );
}; };
const locationDescription = (ds: DataStore) => {
return ds.location ? ` @ ${ds.location}` : '';
};
const { page, perPage } = getPageInfoFromSearchParams( const { page, perPage } = getPageInfoFromSearchParams(
searchParams, searchParams,
10, 10,
@ -116,13 +125,16 @@ export default function DataStoreListTable() {
label="Please select a data store" label="Please select a data store"
items={dataStores} items={dataStores}
selectedItem={dataStore} selectedItem={dataStore}
itemToString={(ds: DataStore) => (ds ? `${ds.name} (${ds.type})` : '')} itemToString={(ds: DataStore) =>
ds ? `${ds.name} (${ds.type}${locationDescription(ds)})` : ''
}
onChange={(event: any) => { onChange={(event: any) => {
setDataStore(event.selectedItem); setDataStore(event.selectedItem);
searchParams.set('datastore_page', '1'); searchParams.set('datastore_page', '1');
searchParams.set('datastore_per_page', '10'); searchParams.set('datastore_per_page', '10');
searchParams.set('type', event.selectedItem.type); searchParams.set('type', event.selectedItem.type);
searchParams.set('name', event.selectedItem.name); searchParams.set('identifier', event.selectedItem.id);
searchParams.set('location', event.selectedItem.location);
setSearchParams(searchParams); setSearchParams(searchParams);
}} }}
/> />

View File

@ -69,7 +69,7 @@ export default function DataStoreListTiles({
<ClickableTile <ClickableTile
id={`data-store-tile-${row.id}`} id={`data-store-tile-${row.id}`}
className="tile-data-store" className="tile-data-store"
href={`/data-stores/${row.id}/edit?parentGroupId=${ href={`/data-stores/${row.id}/edit?type=${row.type}&parentGroupId=${
processGroup?.id ?? '' processGroup?.id ?? ''
}`} }`}
> >

View File

@ -464,6 +464,7 @@ export interface DataStore {
id: string; id: string;
schema: string; schema: string;
description?: string | null; description?: string | null;
location?: string | null;
} }
export interface DataStoreType { export interface DataStoreType {

View File

@ -10,6 +10,7 @@ export default function DataStoreEdit() {
const params = useParams(); const params = useParams();
const [searchParams] = useSearchParams(); const [searchParams] = useSearchParams();
const parentGroupId = searchParams.get('parentGroupId'); const parentGroupId = searchParams.get('parentGroupId');
const dataStoreType = searchParams.get('type');
const dataStoreIdentifier = params.data_store_identifier; const dataStoreIdentifier = params.data_store_identifier;
const [dataStore, setDataStore] = useState<DataStore>({ const [dataStore, setDataStore] = useState<DataStore>({
id: '', id: '',
@ -30,10 +31,10 @@ export default function DataStoreEdit() {
const queryParams = `?process_group_identifier=${parentGroupId}`; const queryParams = `?process_group_identifier=${parentGroupId}`;
HttpService.makeCallToBackend({ HttpService.makeCallToBackend({
path: `/data-stores/json/${dataStoreIdentifier}${queryParams}`, path: `/data-stores/${dataStoreType}/${dataStoreIdentifier}${queryParams}`,
successCallback: setDataStoreFromResult, successCallback: setDataStoreFromResult,
}); });
}, [dataStoreIdentifier, parentGroupId]); }, [dataStoreIdentifier, parentGroupId, dataStoreType]);
const hotCrumbs: HotCrumbItem[] = [['Process Groups', '/process-groups']]; const hotCrumbs: HotCrumbItem[] = [['Process Groups', '/process-groups']];
if (parentGroupId) { if (parentGroupId) {