twitter-fetcher: fix oauth refresh token issue

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2024-10-17 12:09:59 +02:00
parent 63990da411
commit 6de9ccb1e2
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
3 changed files with 51 additions and 4 deletions

View File

@ -10,9 +10,8 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920
dockerImageTag: '0.4.1'
dockerRepository: status-im/airbyte/source-twitter-fetcher-new #TODO change before merging
# dockerRepository: status-im/airbyte/source-twitter-fetcher
dockerImageTag: '0.3.0'
dockerRepository: status-im/airbyte/source-twitter-fetcher
githubIssueLabel: source-twitter-fetcher
icon: twitter-fetcher.svg
license: MIT

View File

@ -0,0 +1,46 @@
from typing import Any, Mapping, Union
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.requests_native_auth import (
BasicHttpAuthenticator,
SingleUseRefreshTokenOauth2Authenticator,
TokenAuthenticator,
)
from airbyte_cdk.utils import AirbyteTracedException
class TwitterOAuth(SingleUseRefreshTokenOauth2Authenticator):
"""
https://developer.x.com/en/docs/authentication/oauth-2-0/user-access-token
"""
def build_refresh_request_headers(self) -> Mapping[str, Any]:
return {
"Authorization": BasicHttpAuthenticator(self.get_client_id(), self.get_client_secret()).token,
"Content-Type": "application/x-www-form-urlencoded",
}
def build_refresh_request_body(self) -> Mapping[str, Any]:
return {
"grant_type": self.get_grant_type(),
"refresh_token": self.get_refresh_token(),
}
def _get_refresh_access_token_response(self) -> Mapping[str, Any]:
response = requests.request(
method="POST",
url=self.get_token_refresh_endpoint(),
data=self.build_refresh_request_body(),
headers=self.build_refresh_request_headers(),
)
content = response.json()
if response.status_code == 400 and content.get("error") == "invalid_grant":
raise AirbyteTracedException(
internal_message=content.get("error_description"),
message="Refresh token is invalid or expired. Please re-authenticate to restore access to Twitter API.",
failure_type=FailureType.config_error,
)
response.raise_for_status()
return content

View File

@ -14,6 +14,8 @@ from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator
from .auth import TwitterOAuth
logger = logging.getLogger("airbyte")
class TwitterStream(HttpStream):
@ -122,7 +124,7 @@ class SourceTwitterFetcher(AbstractSource):
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth=SingleUseRefreshTokenOauth2Authenticator(
auth=TwitterOAuth(
config, token_refresh_endpoint="https://api.x.com/2/oauth2/token")
tweet = Tweet(
authenticator=auth,