Keyed key/value data store implementation (#548)
This commit is contained in:
parent
4d7347c131
commit
a80abc4c47
|
@ -0,0 +1,44 @@
|
||||||
|
"""empty message
|
||||||
|
|
||||||
|
Revision ID: 78f5c2c65bf3
|
||||||
|
Revises: 698a921acb46
|
||||||
|
Create Date: 2023-10-17 10:03:40.828441
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '78f5c2c65bf3'
|
||||||
|
down_revision = '698a921acb46'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table('kkv_data_store',
|
||||||
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('top_level_key', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('secondary_key', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('value', sa.JSON(), nullable=False),
|
||||||
|
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=False),
|
||||||
|
sa.Column('created_at_in_seconds', sa.Integer(), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('id')
|
||||||
|
)
|
||||||
|
with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
|
||||||
|
batch_op.create_index(batch_op.f('ix_kkv_data_store_secondary_key'), ['secondary_key'], unique=False)
|
||||||
|
batch_op.create_index(batch_op.f('ix_kkv_data_store_top_level_key'), ['top_level_key'], unique=False)
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table('kkv_data_store', schema=None) as batch_op:
|
||||||
|
batch_op.drop_index(batch_op.f('ix_kkv_data_store_top_level_key'))
|
||||||
|
batch_op.drop_index(batch_op.f('ix_kkv_data_store_secondary_key'))
|
||||||
|
|
||||||
|
op.drop_table('kkv_data_store')
|
||||||
|
# ### end Alembic commands ###
|
|
@ -0,0 +1,87 @@
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from SpiffWorkflow.bpmn.serializer.helpers.registry import BpmnConverter # type: ignore
|
||||||
|
from SpiffWorkflow.bpmn.specs.data_spec import BpmnDataStoreSpecification # type: ignore
|
||||||
|
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
|
||||||
|
|
||||||
|
|
||||||
|
class KKVDataStore(BpmnDataStoreSpecification): # type: ignore
|
||||||
|
"""KKVDataStore."""
|
||||||
|
|
||||||
|
def _get_model(self, top_level_key: str, secondary_key: str) -> KKVDataStoreModel | None:
|
||||||
|
model = (
|
||||||
|
db.session.query(KKVDataStoreModel)
|
||||||
|
.filter_by(top_level_key=top_level_key, secondary_key=secondary_key)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
return model
|
||||||
|
|
||||||
|
def _delete_all_for_top_level_key(self, top_level_key: str) -> None:
|
||||||
|
models = db.session.query(KKVDataStoreModel).filter_by(top_level_key=top_level_key).all()
|
||||||
|
for model in models:
|
||||||
|
db.session.delete(model)
|
||||||
|
|
||||||
|
def get(self, my_task: SpiffTask) -> None:
|
||||||
|
"""get."""
|
||||||
|
|
||||||
|
def getter(top_level_key: str, secondary_key: str) -> Any | None:
|
||||||
|
model = self._get_model(top_level_key, secondary_key)
|
||||||
|
if model is not None:
|
||||||
|
return model.value
|
||||||
|
return None
|
||||||
|
|
||||||
|
my_task.data[self.bpmn_id] = getter
|
||||||
|
|
||||||
|
def set(self, my_task: SpiffTask) -> None:
|
||||||
|
"""set."""
|
||||||
|
data = my_task.data[self.bpmn_id]
|
||||||
|
if type(data) != dict:
|
||||||
|
raise Exception(
|
||||||
|
f"When writing to this data store, a dictionary is expected as the value for variable '{self.bpmn_id}'"
|
||||||
|
)
|
||||||
|
for top_level_key, second_level in data.items():
|
||||||
|
if second_level is None:
|
||||||
|
self._delete_all_for_top_level_key(top_level_key)
|
||||||
|
continue
|
||||||
|
if type(second_level) != dict:
|
||||||
|
raise Exception(
|
||||||
|
"When writing to this data store, a dictionary is expected as the value for"
|
||||||
|
f" '{self.bpmn_id}[\"{top_level_key}\"]'"
|
||||||
|
)
|
||||||
|
for secondary_key, value in second_level.items():
|
||||||
|
model = self._get_model(top_level_key, secondary_key)
|
||||||
|
if model is None and value is None:
|
||||||
|
continue
|
||||||
|
if value is None:
|
||||||
|
db.session.delete(model)
|
||||||
|
continue
|
||||||
|
if model is None:
|
||||||
|
model = KKVDataStoreModel(top_level_key=top_level_key, secondary_key=secondary_key, value=value)
|
||||||
|
else:
|
||||||
|
model.value = value
|
||||||
|
db.session.add(model)
|
||||||
|
db.session.commit()
|
||||||
|
del my_task.data[self.bpmn_id]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def register_data_store_class(data_store_classes: dict[str, Any]) -> None:
|
||||||
|
data_store_classes["KKVDataStore"] = KKVDataStore
|
||||||
|
|
||||||
|
|
||||||
|
class KKVDataStoreConverter(BpmnConverter): # type: ignore
|
||||||
|
"""KKVDataStoreConverter."""
|
||||||
|
|
||||||
|
def to_dict(self, spec: Any) -> dict[str, Any]:
|
||||||
|
"""to_dict."""
|
||||||
|
return {
|
||||||
|
"bpmn_id": spec.bpmn_id,
|
||||||
|
"bpmn_name": spec.bpmn_name,
|
||||||
|
"capacity": spec.capacity,
|
||||||
|
"is_unlimited": spec.is_unlimited,
|
||||||
|
}
|
||||||
|
|
||||||
|
def from_dict(self, dct: dict[str, Any]) -> KKVDataStore:
|
||||||
|
"""from_dict."""
|
||||||
|
return KKVDataStore(**dct)
|
|
@ -91,6 +91,9 @@ from spiffworkflow_backend.models.typeahead import (
|
||||||
from spiffworkflow_backend.models.json_data_store import (
|
from spiffworkflow_backend.models.json_data_store import (
|
||||||
JSONDataStoreModel,
|
JSONDataStoreModel,
|
||||||
) # noqa: F401
|
) # noqa: F401
|
||||||
|
from spiffworkflow_backend.models.kkv_data_store import (
|
||||||
|
KKVDataStoreModel,
|
||||||
|
) # noqa: F401
|
||||||
from spiffworkflow_backend.models.task_draft_data import (
|
from spiffworkflow_backend.models.task_draft_data import (
|
||||||
TaskDraftDataModel,
|
TaskDraftDataModel,
|
||||||
) # noqa: F401
|
) # noqa: F401
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class KKVDataStoreModel(SpiffworkflowBaseDBModel):
|
||||||
|
__tablename__ = "kkv_data_store"
|
||||||
|
|
||||||
|
id: int = db.Column(db.Integer, primary_key=True)
|
||||||
|
top_level_key: str = db.Column(db.String(255), nullable=False, index=True)
|
||||||
|
secondary_key: str = db.Column(db.String(255), nullable=False, index=True)
|
||||||
|
value: dict = db.Column(db.JSON, nullable=False)
|
||||||
|
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||||
|
created_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
|
@ -4,6 +4,7 @@ from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser # type: ignore
|
||||||
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
|
from SpiffWorkflow.spiff.parser.process import SpiffBpmnParser # type: ignore
|
||||||
from spiffworkflow_backend.data_stores.json import JSONDataStore
|
from spiffworkflow_backend.data_stores.json import JSONDataStore
|
||||||
from spiffworkflow_backend.data_stores.json import JSONFileDataStore
|
from spiffworkflow_backend.data_stores.json import JSONFileDataStore
|
||||||
|
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
|
||||||
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStore
|
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStore
|
||||||
from spiffworkflow_backend.specs.start_event import StartEvent
|
from spiffworkflow_backend.specs.start_event import StartEvent
|
||||||
|
|
||||||
|
@ -18,6 +19,7 @@ class MyCustomParser(BpmnDmnParser): # type: ignore
|
||||||
|
|
||||||
DATA_STORE_CLASSES: dict[str, Any] = {}
|
DATA_STORE_CLASSES: dict[str, Any] = {}
|
||||||
|
|
||||||
|
KKVDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
||||||
JSONDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
JSONDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
||||||
JSONFileDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
JSONFileDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
||||||
TypeaheadDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
TypeaheadDataStore.register_data_store_class(DATA_STORE_CLASSES)
|
||||||
|
|
|
@ -49,6 +49,8 @@ from spiffworkflow_backend.data_stores.json import JSONDataStore
|
||||||
from spiffworkflow_backend.data_stores.json import JSONDataStoreConverter
|
from spiffworkflow_backend.data_stores.json import JSONDataStoreConverter
|
||||||
from spiffworkflow_backend.data_stores.json import JSONFileDataStore
|
from spiffworkflow_backend.data_stores.json import JSONFileDataStore
|
||||||
from spiffworkflow_backend.data_stores.json import JSONFileDataStoreConverter
|
from spiffworkflow_backend.data_stores.json import JSONFileDataStoreConverter
|
||||||
|
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
|
||||||
|
from spiffworkflow_backend.data_stores.kkv import KKVDataStoreConverter
|
||||||
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStore
|
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStore
|
||||||
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStoreConverter
|
from spiffworkflow_backend.data_stores.typeahead import TypeaheadDataStoreConverter
|
||||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||||
|
@ -99,6 +101,7 @@ from sqlalchemy import and_
|
||||||
SPIFF_CONFIG[StartEvent] = EventConverter
|
SPIFF_CONFIG[StartEvent] = EventConverter
|
||||||
SPIFF_CONFIG[JSONDataStore] = JSONDataStoreConverter
|
SPIFF_CONFIG[JSONDataStore] = JSONDataStoreConverter
|
||||||
SPIFF_CONFIG[JSONFileDataStore] = JSONFileDataStoreConverter
|
SPIFF_CONFIG[JSONFileDataStore] = JSONFileDataStoreConverter
|
||||||
|
SPIFF_CONFIG[KKVDataStore] = KKVDataStoreConverter
|
||||||
SPIFF_CONFIG[TypeaheadDataStore] = TypeaheadDataStoreConverter
|
SPIFF_CONFIG[TypeaheadDataStore] = TypeaheadDataStoreConverter
|
||||||
|
|
||||||
# Sorry about all this crap. I wanted to move this thing to another file, but
|
# Sorry about all this crap. I wanted to move this thing to another file, but
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
from collections.abc import Generator
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from flask.app import Flask
|
||||||
|
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
from spiffworkflow_backend.models.kkv_data_store import KKVDataStoreModel
|
||||||
|
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MockTask:
|
||||||
|
data: dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def with_clean_data_store(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]:
|
||||||
|
db.session.query(KKVDataStoreModel).delete()
|
||||||
|
db.session.commit()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def with_key1_key2_record(with_clean_data_store: None) -> Generator[None, None, None]:
|
||||||
|
model = KKVDataStoreModel(top_level_key="key1", secondary_key="key2", value="value1") # type: ignore
|
||||||
|
db.session.add(model)
|
||||||
|
db.session.commit()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
class TestKKVDataStore(BaseTest):
|
||||||
|
"""Infer from class name."""
|
||||||
|
|
||||||
|
def test_returns_none_if_no_records_exist(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={})
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
assert len(my_task.data) == 1
|
||||||
|
result = my_task.data["the_id"]("key1", "key2")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_can_return_value_if_both_keys_match(self, with_key1_key2_record: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={})
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
assert len(my_task.data) == 1
|
||||||
|
result = my_task.data["the_id"]("key1", "key2")
|
||||||
|
assert result == "value1"
|
||||||
|
|
||||||
|
def test_returns_none_if_first_key_does_not_match(self, with_key1_key2_record: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={})
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
assert len(my_task.data) == 1
|
||||||
|
result = my_task.data["the_id"]("key11", "key2")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_returns_none_if_second_key_does_not_match(self, with_key1_key2_record: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={})
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
assert len(my_task.data) == 1
|
||||||
|
result = my_task.data["the_id"]("key1", "key22")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_can_insert_a_value(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 1
|
||||||
|
|
||||||
|
def test_can_insert_mulitple_values_for_same_top_key(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 2
|
||||||
|
|
||||||
|
def test_can_insert_mulitple_values_for_different_top_key(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 2
|
||||||
|
|
||||||
|
def test_value_is_removed_from_task_data_after_insert(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
assert "the_id" not in my_task.data
|
||||||
|
|
||||||
|
def test_can_get_after_a_set(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
result1 = my_task.data["the_id"]("newKey1", "newKey2")
|
||||||
|
assert result1 == "newValue"
|
||||||
|
result2 = my_task.data["the_id"]("newKey3", "newKey4")
|
||||||
|
assert result2 == "newValue2"
|
||||||
|
|
||||||
|
def test_can_update_a_value(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
my_task.data = {"the_id": {"newKey1": {"newKey2": "newValue2"}}}
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 1
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
result2 = my_task.data["the_id"]("newKey1", "newKey2")
|
||||||
|
assert result2 == "newValue2"
|
||||||
|
|
||||||
|
def test_can_delete_record_by_nulling_a_secondary_key(self, with_key1_key2_record: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"key1": {"key2": None}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
result = my_task.data["the_id"]("key1", "key2")
|
||||||
|
assert result is None
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 0
|
||||||
|
|
||||||
|
def test_can_delete_all_records_by_nulling_a_top_level_key(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue", "newKey3": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
my_task.data = {"the_id": {"newKey1": None}}
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 0
|
||||||
|
|
||||||
|
def test_top_key_delete_does_not_delete_for_other_top_levels(self, with_clean_data_store: None) -> None:
|
||||||
|
kkv_data_store = KKVDataStore("the_id", "the_name")
|
||||||
|
my_task = MockTask(data={"the_id": {"newKey1": {"newKey2": "newValue"}, "newKey3": {"newKey4": "newValue2"}}})
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
my_task.data = {"the_id": {"newKey1": None}}
|
||||||
|
kkv_data_store.set(my_task)
|
||||||
|
count = db.session.query(KKVDataStoreModel).count()
|
||||||
|
assert count == 1
|
||||||
|
kkv_data_store.get(my_task)
|
||||||
|
result = my_task.data["the_id"]("newKey3", "newKey4")
|
||||||
|
assert result == "newValue2"
|
Loading…
Reference in New Issue