airbyte-custom-connector/source-simplecast-fecther/source_simplecast_fecther/source.py

241 lines
8.7 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import logging
import requests
import time
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
logger = logging.getLogger("airbyte")
LOCATION_KEYS=["id", "rank", "name", "downloads_total", "downloads_percent"]
TIME_OF_WEEK_KEYS=["rank", "hour_of_week", "hour_of_day", "day_of_week", "count"]
DOWNLOADS_KEY=["interval", "downloads_total", "downloads_percent"]
TECH_KEY=["rank", "name", "downloads_total", "downloads_percent"]
# Basic full refresh stream
class SimplecastFectherStream(HttpStream):
url_base = "https://api.simplecast.com/"
primary_key = None
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
pages = response.json().get('pages')
if pages and pages.get('next'):
time.sleep(2)
return {
'limit': pages.get('limit'),
'offset': pages.get('limit')* pages.get('current')
}
def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
if next_page_token:
return next_page_token
class Podcast(SimplecastFectherStream):
primary_key = "id"
@property
def use_cache(self) -> bool:
return True
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return "podcasts"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
for elt in data.get('collection'):
podcast={
"id": elt.get("id"),
"title": elt.get("title"),
"status": elt.get("status"),
"href": elt.get("href"),
"episode_count": elt.get("episodes").get("count"),
"account_id": elt.get("account_id"),
"account_owner_name": elt.get("account").get("owner").get("name")
}
yield podcast
class Episode(HttpSubStream, SimplecastFectherStream):
primary_key="id"
@property
def use_cache(self) -> bool:
return True
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
podcast_id=stream_slice.get("parent").get("id")
return f"podcasts/{podcast_id}/episodes"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
for elt in data.get('collection'):
episode={
"id": elt.get("id"),
"title": elt.get("title"),
"status": elt.get("status"),
"published_at": elt.get("published_at"),
"updated_at": elt.get("updated_at"),
"season": elt.get('season'),
"number": elt.get("number"),
"description": elt.get("description"),
"token": elt.get("token"),
"type": elt.get("type")
}
yield episode
class AnalyticSubStream(HttpSubStream, SimplecastFectherStream, ABC):
primary_key=None
def __init__(self, endpoint:str, keys_dict:dict, collection_name:str, **kwargs):
super().__init__(**kwargs)
self.endpoint=endpoint
self.keys_dict=keys_dict
self.collection_name = collection_name
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
podcast_id=stream_slice.get("parent").get("id")
return f"analytics/{self.endpoint}?podcast={podcast_id}"
"""
Default implementation of the parse_response to get the data from the json_objection collection_name.
If the object mapping is not a simple key mapping then this function as to be overwriten.
"""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
for elt in data.get(self.collection_name):
logger.debug("Elt %s", elt)
analytic={ key: elt.get(key) for key in self.keys_dict }
yield analytic
class AnalyticLocation(AnalyticSubStream):
primary_key=None
def __init__(self, **kwargs):
super().__init__(endpoint="location", keys_dict=LOCATION_KEYS, collection_name="countries", **kwargs)
class AnalyticTimeOfWeek(AnalyticSubStream):
primary_key=None
def __init__(self, **kwargs):
super().__init__(endpoint="time_of_week", keys_dict=TIME_OF_WEEK_KEYS, collection_name="collection", **kwargs)
class AnalyticDownload(AnalyticSubStream):
def __init__(self, **kwargs):
super().__init__(endpoint="downloads", keys_dict=DOWNLOADS_KEY, collection_name="by_interval", **kwargs)
class TechnologyApplication(AnalyticSubStream):
def __init__(self, **kwargs):
super().__init__(endpoint="technology/applications", keys_dict=TECH_KEY, collection_name="collection", **kwargs)
class TechnologyDeviceClass(AnalyticSubStream):
def __init__(self, **kwargs):
super().__init__(endpoint="technology/devices", keys_dict=TECH_KEY, collection_name="collection", **kwargs)
class TechnologyListeningMethod(AnalyticSubStream):
def __init__(self, **kwargs):
super().__init__(endpoint="technology/listening_methods", keys_dict=TECH_KEY, collection_name="collection", **kwargs)
class AnalyticEpisodeV2(HttpSubStream,SimplecastFectherStream):
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
episode_id=stream_slice.get("parent").get("id")
return f"analytics?episode={episode_id}"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
yield data
class AnalyticPodcastV2(HttpSubStream, SimplecastFectherStream):
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
podcast_id=stream_slice.get("parent").get("id")
return f"analytics?podcast={podcast_id}"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
yield data
class AnalyticEpisodeDownload(HttpSubStream, SimplecastFectherStream):
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
episode_id=stream_slice.get("parent").get("id")
return f"analytics/downloads?episode={episode_id}"
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
data=response.json()
logger.debug("Response: %s", data)
yield data
# Source
class SourceSimplecastFecther(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["api_key"])
podcasts=Podcast(authenticator=auth)
episodes=Episode(authenticator=auth, parent=podcasts)
return [
podcasts,
episodes,
AnalyticLocation(authenticator=auth, parent=podcasts),
AnalyticTimeOfWeek(authenticator=auth, parent=podcasts),
AnalyticDownload(authenticator=auth,parent=podcasts),
TechnologyApplication(authenticator=auth, parent=podcasts),
TechnologyDeviceClass(authenticator=auth, parent=podcasts),
TechnologyListeningMethod(authenticator=auth, parent=podcasts),
AnalyticEpisodeV2(authenticator=auth, parent=episodes),
AnalyticPodcastV2(authenticator=auth, parent=podcasts),
AnalyticEpisodeDownload(authenticator=auth, parent=episodes)
]