twitter-fetch: limit tweet metrics calls
Calling full metrics call per tweet only for the last month Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
parent
6de9ccb1e2
commit
2c77efd487
|
@ -2,6 +2,7 @@
|
||||||
from typing import Any, Mapping, Union
|
from typing import Any, Mapping, Union
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
import logging
|
||||||
from airbyte_cdk.models import FailureType
|
from airbyte_cdk.models import FailureType
|
||||||
from airbyte_cdk.sources.streams.http.requests_native_auth import (
|
from airbyte_cdk.sources.streams.http.requests_native_auth import (
|
||||||
BasicHttpAuthenticator,
|
BasicHttpAuthenticator,
|
||||||
|
@ -10,12 +11,15 @@ from airbyte_cdk.sources.streams.http.requests_native_auth import (
|
||||||
)
|
)
|
||||||
from airbyte_cdk.utils import AirbyteTracedException
|
from airbyte_cdk.utils import AirbyteTracedException
|
||||||
|
|
||||||
|
logger = logging.getLogger("airbyte")
|
||||||
|
|
||||||
class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator):
|
class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator):
|
||||||
"""
|
"""
|
||||||
https://developer.x.com/en/docs/authentication/oauth-2-0/user-access-token
|
https://developer.x.com/en/docs/authentication/oauth-2-0/user-access-token
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def build_refresh_request_headers(self) -> Mapping[str, Any]:
|
def build_refresh_request_headers(self) -> Mapping[str, Any]:
|
||||||
|
logger.info("Refreshing token")
|
||||||
return {
|
return {
|
||||||
"Authorization": BasicHttpAuthenticator(self.get_client_id(), self.get_client_secret()).token,
|
"Authorization": BasicHttpAuthenticator(self.get_client_id(), self.get_client_secret()).token,
|
||||||
"Content-Type": "application/x-www-form-urlencoded",
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
|
@ -35,7 +39,9 @@ class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator):
|
||||||
headers=self.build_refresh_request_headers(),
|
headers=self.build_refresh_request_headers(),
|
||||||
)
|
)
|
||||||
content = response.json()
|
content = response.json()
|
||||||
if response.status_code == 400 and content.get("error") == "invalid_grant":
|
logger.info("Refresh - response status code %s", response.status_code)
|
||||||
|
if response.status_code == 400 and content.get("error") == "invalid_request":
|
||||||
|
logger.error("Error when refreshing token: %s", content)
|
||||||
raise AirbyteTracedException(
|
raise AirbyteTracedException(
|
||||||
internal_message=content.get("error_description"),
|
internal_message=content.get("error_description"),
|
||||||
message="Refresh token is invalid or expired. Please re-authenticate to restore access to Twitter API.",
|
message="Refresh token is invalid or expired. Please re-authenticate to restore access to Twitter API.",
|
||||||
|
|
|
@ -13,6 +13,7 @@ from airbyte_cdk.sources.streams import Stream
|
||||||
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
|
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
|
||||||
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
|
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
|
||||||
from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator
|
from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator
|
||||||
|
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
|
||||||
|
|
||||||
from .auth import TwitterOAuth
|
from .auth import TwitterOAuth
|
||||||
|
|
||||||
|
@ -63,14 +64,13 @@ class Tweet(TwitterStream):
|
||||||
stream_slice: Mapping[str, Any] = None
|
stream_slice: Mapping[str, Any] = None
|
||||||
) -> MutableMapping[str, Any]:
|
) -> MutableMapping[str, Any]:
|
||||||
params = {
|
params = {
|
||||||
"tweet.fields" : "text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at",
|
"tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at",
|
||||||
"max_results": 100
|
"max_results": 100
|
||||||
}
|
}
|
||||||
# Add condition later:
|
# Add condition later:
|
||||||
params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
|
params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
|
||||||
if next_page_token:
|
if next_page_token:
|
||||||
params.update(**next_page_token)
|
params.update(**next_page_token)
|
||||||
logger.info(f"DBG - query params: %s", params)
|
|
||||||
return params
|
return params
|
||||||
|
|
||||||
|
|
||||||
|
@ -82,7 +82,6 @@ class Tweet(TwitterStream):
|
||||||
if 'data' in response.json():
|
if 'data' in response.json():
|
||||||
data=response.json()['data']
|
data=response.json()['data']
|
||||||
for t in data:
|
for t in data:
|
||||||
logger.debug("DBG-T: id %s", t.get('id'))
|
|
||||||
yield t
|
yield t
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
|
@ -94,10 +93,19 @@ class TweetMetrics(HttpSubStream, Tweet):
|
||||||
stream_slice: Mapping[str, Any] = None,
|
stream_slice: Mapping[str, Any] = None,
|
||||||
next_page_token: Mapping[str, Any] = None
|
next_page_token: Mapping[str, Any] = None
|
||||||
) -> str:
|
) -> str:
|
||||||
tweet_id = stream_slice.get("parent").get("id")
|
tweet_id = stream_slice.get("id")
|
||||||
logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id)
|
logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id)
|
||||||
return f"tweets/{tweet_id}"
|
return f"tweets/{tweet_id}"
|
||||||
|
|
||||||
|
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
|
||||||
|
limit_date = datetime.today()- timedelta(31)
|
||||||
|
for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh):
|
||||||
|
tweet = parent_slice["parent"]
|
||||||
|
if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date:
|
||||||
|
yield {"id": tweet.get('id') }
|
||||||
|
else:
|
||||||
|
logger.info("Not calling full metrics endpoint for tweet %s, tweet too old", tweet.get('id'))
|
||||||
|
|
||||||
def request_params(
|
def request_params(
|
||||||
self, stream_state: Optional[Mapping[str, Any]],
|
self, stream_state: Optional[Mapping[str, Any]],
|
||||||
stream_slice: Optional[Mapping[str, Any]] = None,
|
stream_slice: Optional[Mapping[str, Any]] = None,
|
||||||
|
@ -111,7 +119,6 @@ class TweetMetrics(HttpSubStream, Tweet):
|
||||||
return params
|
return params
|
||||||
|
|
||||||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
||||||
logger.info("Twtter Response: %s", response.json())
|
|
||||||
if 'data' in response.json():
|
if 'data' in response.json():
|
||||||
data=response.json()['data']
|
data=response.json()['data']
|
||||||
logger.debug("DBG-FULL-T: id %s", data.get('id'))
|
logger.debug("DBG-FULL-T: id %s", data.get('id'))
|
||||||
|
|
Loading…
Reference in New Issue