add XCom Redis backend implementation

Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
Jakub Sokołowski 2021-03-30 14:15:02 +02:00
parent a3c200806c
commit 0323b8d3bf
No known key found for this signature in database
GPG Key ID: 4EF064D0E6D63020
2 changed files with 29 additions and 12 deletions

View File

@ -1,6 +1,31 @@
# -*- coding: utf-8 -*-
"""Module init-file.
from uuid import uuid4
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.redis.hooks.redis import RedisHook
The __init__.py files are required to make Python treat directories
containing the file as packages.
"""
class XComRedisBackend(BaseXCom):
PREFIX = 'xcom_redis://'
# This connection needs to exist in advance
CONN_ID = 'xcom_cache'
@staticmethod
def serialize_value(value: Any):
hook = RedisHook(redis_conn_id=XComRedisBackend.CONN_ID)
key = str(uuid4())
# We use the default serializer, which pickles or JSONs
hook.get_conn().set(key, BaseXCom.serialize_value(value))
# Add prefix to make it clear where the value is stored.
value = XComRedisBackend.PREFIX + key
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
prefix = XComRedisBackend.PREFIX
if isinstance(result, str) and result.startswith(prefix):
key = result.replace(prefix, "")
hook = RedisHook(redis_conn_id=XComRedisBackend.CONN_ID)
result = hook.get_conn().get(key)
result = BaseXCom.deserialize_value(result)
return result

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
"""Module init-file.
A module's __name__ is set equal to '__main__' when read from standard input,
a script, or from an interactive prompt.
"""
print('TODO')