Allow getting all secondary keys and values for a KKV top level key (#1016)

This commit is contained in:
jbirddog 2024-02-12 09:50:34 -05:00 committed by GitHub
parent 0174472f50
commit c6c1a4a593
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -71,7 +71,7 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
def get(self, my_task: SpiffTask) -> None:
"""get."""
def getter(top_level_key: str, secondary_key: str) -> Any | None:
def getter(top_level_key: str, secondary_key: str | None) -> Any | None:
location = self.data_store_location_for_task(KKVDataStoreModel, my_task, self.bpmn_id)
store_model: KKVDataStoreModel | None = None
@ -81,15 +81,26 @@ class KKVDataStore(BpmnDataStoreSpecification, DataStoreCRUD): # type: ignore
if store_model is None:
raise DataStoreReadError(f"Unable to locate kkv data store '{self.bpmn_id}'.")
model = (
if secondary_key is not None:
model = (
db.session.query(KKVDataStoreEntryModel)
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key, secondary_key=secondary_key)
.first()
)
if model is not None:
return model.value
return None
models = (
db.session.query(KKVDataStoreEntryModel)
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key, secondary_key=secondary_key)
.first()
.filter_by(kkv_data_store_id=store_model.id, top_level_key=top_level_key)
.all()
)
if model is not None:
return model.value
return None
values = {model.secondary_key: model.value for model in models}
return values
my_task.data[self.bpmn_id] = getter