diff --git a/source-twitter-fetcher/source_twitter_fetcher/auth.py b/source-twitter-fetcher/source_twitter_fetcher/auth.py index e33b670..b6011bf 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/auth.py +++ b/source-twitter-fetcher/source_twitter_fetcher/auth.py @@ -2,6 +2,7 @@ from typing import Any, Mapping, Union import requests +import logging from airbyte_cdk.models import FailureType from airbyte_cdk.sources.streams.http.requests_native_auth import ( BasicHttpAuthenticator, @@ -10,12 +11,15 @@ from airbyte_cdk.sources.streams.http.requests_native_auth import ( ) from airbyte_cdk.utils import AirbyteTracedException +logger = logging.getLogger("airbyte") + class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator): """ https://developer.x.com/en/docs/authentication/oauth-2-0/user-access-token """ def build_refresh_request_headers(self) -> Mapping[str, Any]: + logger.info("Refreshing token") return { "Authorization": BasicHttpAuthenticator(self.get_client_id(), self.get_client_secret()).token, "Content-Type": "application/x-www-form-urlencoded", @@ -35,7 +39,9 @@ class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator): headers=self.build_refresh_request_headers(), ) content = response.json() - if response.status_code == 400 and content.get("error") == "invalid_grant": + logger.info("Refresh - response status code %s", response.status_code) + if response.status_code == 400 and content.get("error") == "invalid_request": + logger.error("Error when refreshing token: %s", content) raise AirbyteTracedException( internal_message=content.get("error_description"), message="Refresh token is invalid or expired. Please re-authenticate to restore access to Twitter API.", diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index 18f50fa..dbf125e 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -13,6 +13,7 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator +from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode from .auth import TwitterOAuth @@ -63,14 +64,13 @@ class Tweet(TwitterStream): stream_slice: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: params = { - "tweet.fields" : "text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at", + "tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at", "max_results": 100 } # Add condition later: params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) if next_page_token: params.update(**next_page_token) - logger.info(f"DBG - query params: %s", params) return params @@ -82,7 +82,6 @@ class Tweet(TwitterStream): if 'data' in response.json(): data=response.json()['data'] for t in data: - logger.debug("DBG-T: id %s", t.get('id')) yield t time.sleep(2) @@ -94,10 +93,19 @@ class TweetMetrics(HttpSubStream, Tweet): stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - tweet_id = stream_slice.get("parent").get("id") + tweet_id = stream_slice.get("id") logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) return f"tweets/{tweet_id}" + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + limit_date = datetime.today()- timedelta(31) + for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): + tweet = parent_slice["parent"] + if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: + yield {"id": tweet.get('id') } + else: + logger.info("Not calling full metrics endpoint for tweet %s, tweet too old", tweet.get('id')) + def request_params( self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, @@ -111,7 +119,6 @@ class TweetMetrics(HttpSubStream, Tweet): return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - logger.info("Twtter Response: %s", response.json()) if 'data' in response.json(): data=response.json()['data'] logger.debug("DBG-FULL-T: id %s", data.get('id'))