From 11efa928114c01a57fe9ebcb07ba0970d5b86b86 Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Wed, 27 Mar 2024 15:56:48 +0100 Subject: [PATCH] twitter-fetcher: update connector Signed-off-by: Alexis Pentori --- source-twitter-fetcher/metadata.yaml | 2 +- .../sample_files/config-example.json | 21 +-- .../sample_files/configured_catalog.json | 5 +- .../schemas/account.json | 35 ++++ .../schemas/follower.json | 33 ++++ .../source_twitter_fetcher/schemas/tweet.json | 98 ++++++++++ .../schemas/twitter_account_data.json | 54 ------ .../schemas/twitter_tweet.json | 48 ----- .../source_twitter_fetcher/source.py | 172 +++++++----------- .../source_twitter_fetcher/spec.yaml | 46 +++-- 10 files changed, 274 insertions(+), 240 deletions(-) create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/account.json create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/follower.json create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json delete mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_account_data.json delete mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_tweet.json diff --git a/source-twitter-fetcher/metadata.yaml b/source-twitter-fetcher/metadata.yaml index 7401e30..2164538 100644 --- a/source-twitter-fetcher/metadata.yaml +++ b/source-twitter-fetcher/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 - dockerImageTag: '0.0.1' + dockerImageTag: '0.2.0' dockerRepository: status-im/airbyte/source-twitter-fetcher githubIssueLabel: source-twitter-fetcher icon: twitter-fetcher.svg diff --git a/source-twitter-fetcher/sample_files/config-example.json b/source-twitter-fetcher/sample_files/config-example.json index 1461bf9..32440ae 100644 --- a/source-twitter-fetcher/sample_files/config-example.json +++ b/source-twitter-fetcher/sample_files/config-example.json @@ -1,15 +1,10 @@ { - "api_key": "some_key", - "accounts": [ - "Logos_network", - "Codex_storage", - "Waku_org", - "ethnimbus", - "ac1d_info", - "HashingItOutPod", - "vacp2p", - "InstituteFT" - ], - "start_time": "2024-01-01", - "stop_time": "2024-01-26" + "credentials":{ + "client_id": "some-id", + "client_secret": "some-secret", + "access_token": "some-access-token", + "refresh_token": "some-refresh-token", + "token_expiry_date": "" + }, + "start_time": "2024-01-01" } diff --git a/source-twitter-fetcher/sample_files/configured_catalog.json b/source-twitter-fetcher/sample_files/configured_catalog.json index 47475f2..f1b620d 100644 --- a/source-twitter-fetcher/sample_files/configured_catalog.json +++ b/source-twitter-fetcher/sample_files/configured_catalog.json @@ -2,7 +2,7 @@ "streams": [ { "stream": { - "name": "twitter_account_data", + "name": "account", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" @@ -16,7 +16,7 @@ }, { "stream": { - "name": "twitter_tweet", + "name": "tweet", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" @@ -28,6 +28,5 @@ "sync_mode": "incremental", "destination_sync_mode": "overwrite" } - ] } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json new file mode 100644 index 0000000..d3a65f5 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json @@ -0,0 +1,35 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ "null", "string" ] + }, + "name": { + "type": [ "null", "string" ] + }, + "username": { + "type": [ "null", "string" ] + }, + "public_metrics": { + "type": ["null", "object" ], + "properties": { + "tweet_count": { + "type": [ "null", "number" ] + }, + "like_count": { + "type": [ "null", "number" ] + }, + "following_count": { + "type": [ "null", "number" ] + }, + "follower_count": { + "type": [ "null", "number" ] + }, + "listed_count": { + "type": [ "null", "number" ] + } + } + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/follower.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/follower.json new file mode 100644 index 0000000..fd5028a --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/follower.json @@ -0,0 +1,33 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ "null", "string" ] + }, + "name": { + "type": [ "null", "string" ] + }, + "username": { + "type": [ "null", "string" ] + }, + "created_at": { + "type": [ "null", "string" ] + }, + "location": { + "type": [ "null", "string" ] + }, + "url": { + "type": [ "null", "string" ] + }, + "description": { + "type": [ "null", "string" ] + }, + "verified": { + "type": [ "null", "boolean" ] + }, + "verified_type": { + "type": [ "null", "string" ] + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json new file mode 100644 index 0000000..9548fb6 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json @@ -0,0 +1,98 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": [ "null", "string"] + }, + "text": { + "type": [ "null", "string"] + }, + "created_at": { + "type": [ "null", "string"] + }, + "author_id": { + "type": [ "null", "string"] + }, + "conversation_id": { + "type": [ "null", "string"] + }, + "reply_settings": { + "type": ["null", "string"] + }, + "referenced_tweets": { + "type": [ "null", "array" ], + "items": { + "type": ["object"], + "properties":{ + "type": { + "type": [ "null", "string" ] + }, + "id": { + "type": [ "null", "number" ] + } + } + } + }, + "public_metrics": { + "type": ["null", "object" ], + "properties": { + "retweet_count": { + "type": [ "null", "number" ] + }, + "reply_count": { + "type": [ "null", "number" ] + }, + "like_count": { + "type": [ "null", "number" ] + }, + "quote_count": { + "type": [ "null", "number" ] + }, + "impression_count": { + "type": [ "null", "number" ] + }, + "bookmark_count": { + "type": [ "null", "number" ] + } + } + }, + "non_public_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": [ "null", "number" ] + }, + "url_link_clicks": { + "type": [ "null", "number" ] + }, + "user_profile_clicks": { + "type": [ "null", "number" ] + } + } + }, + "organic_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": [ "null", "number" ] + }, + "url_link_clicks": { + "type": [ "null", "number" ] + }, + "user_profile_clicks": { + "type": [ "null", "number" ] + }, + "retweet_count": { + "type": [ "null", "number" ] + }, + "reply_count": { + "type": [ "null", "number" ] + }, + "like_count": { + "type": [ "null", "number" ] + } + } + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_account_data.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_account_data.json deleted file mode 100644 index f10226a..0000000 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_account_data.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "account_id": { - "type": [ - "null", - "string" - ] - }, - "account_name": { - "type": [ - "null", - "string" - ] - }, - "username": { - "type": [ - "null", - "string" - ] - }, - "tweet_count": { - "type": [ - "null", - "number" - ] - }, - "like_count": { - "type": [ - "null", - "number" - ] - }, - "following_count": { - "type": [ - "null", - "number" - ] - }, - "follower_count": { - "type": [ - "null", - "number" - ] - }, - "listed_count": { - "type": [ - "null", - "number" - ] - } - } -} diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_tweet.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_tweet.json deleted file mode 100644 index 586a5e2..0000000 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/twitter_tweet.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": [ - "null", - "string" - ] - }, - "id": { - "created_at": [ - "null", - "string" - ] - }, - "retweet_count": { - "type": [ - "null", - "number" - ] - }, - "reply_count": { - "type": [ - "null", - "number" - ] - }, - "like_count": { - "type": [ - "null", - "number" - ] - }, - "quote_count": { - "type": [ - "null", - "number" - ] - }, - "referenced_tweets": { - "type": [ - "null", - "string" - ] - } - } -} diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index c615987..ef71a08 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -12,127 +12,85 @@ from datetime import datetime from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator logger = logging.getLogger("airbyte") class TwitterStream(HttpStream): - url_base = "https://api.twitter.com/2/" + url_base = "https://api.twitter.com/2/" - def __init__(self, api_key: str=None, accounts: List=None, start_time: str = None, stop_time: str = None, **kwargs): - super().__init__(**kwargs) - self.api_key = api_key - self.accounts = accounts - self.start_time = start_time - self.stop_time = stop_time; + def __init__(self, start_time: str = None, stop_time: str = None, **kwargs): + super().__init__(**kwargs) + self.start_time = start_time + self.stop_time = stop_time; - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return None - -class TwitterAccountData(TwitterStream): - - primary_key = "account_id" - - @property - def use_cache(self) -> bool: - return True + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - for account in self.accounts: - yield { - "name": account - } +class Account(TwitterStream): + + primary_key = "id" + + @property + def use_cache(self) -> bool: + return True + + def path( + self, stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f"users/me?user.fields=public_metrics" + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.info("Response: %s", response.json()) + data=response.json()['data'] + yield data + # Wait to avoid reaching API limit + time.sleep(2) - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return f"users/by/username/{stream_slice['name']}?user.fields=public_metrics" +class Tweet(HttpSubStream, Account): + primary_key = "id" - def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return { - "Authorization" : f"Bearer {self.api_key}", - "User-Agent": "v2RecentSearchPython" - } + def __init__(self, **kwargs): + super().__init__(Account(**kwargs),**kwargs) - def parse_response( - self, - response: requests.Response, - stream_slice: Mapping[str, Any] = None, - **kwargs - ) -> Iterable[Mapping]: - logger.info("Getting data of %s account", stream_slice['name']) - logger.info("Response: %s", response.json()) - data=response.json()['data'] - yield { - "account_id": data['id'], - "username": data['username'], - "account_name": data['name'], - "tweet_count": data['public_metrics']['tweet_count'], - "like_count": data['public_metrics']['like_count'], - "following_count": data['public_metrics']['following_count'], - "follower_count": data['public_metrics']['followers_count'], - "listed_count": data['public_metrics']['listed_count'], - } - time.sleep(2) - - -class TwitterTweet(HttpSubStream, TwitterAccountData): - #TODO: See how to get the account ID - primary_key = "" - def __init__(self, **kwargs): - super().__init__(TwitterAccountData(**kwargs),**kwargs) - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - account_id = stream_slice.get("parent").get("account_id") - return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,author_id,referenced_tweets,created_at&start_time={self.start_time}" - - def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - return { - "Authorization" : f"Bearer {self.api_key}", - "User-Agent": "v2RecentSearchPython" - } - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - data=response.json()['data'] - logger.info("Response: %s", response.json()) - referenced_tweets="" - for t in data: - if "referenced_tweets" in t: - for rt in t.get('referenced_tweets'): - referenced_tweets += f"{rt.get('type')}:{rt.get('id')};" - yield { - "id": t['id'], - "created_at": t.get('created_at'), - "retweet_count": t.get('public_metrics').get('retweet_count'), - "reply_count": t.get('public_metrics').get('reply_count'), - "like_count": t.get('public_metrics').get('like_count'), - "quote_count": t.get('public_metrics').get('quote_count'), - "referenced_tweets": referenced_tweets - } - time.sleep(2) + def path( + self, stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + account_id = stream_slice.get("parent").get("id") + logger.info("Account id %s", account_id) + return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + logger.debug("Twtter Response: %s", response.json()) + data=response.json()['data'] + for t in data: + yield t + time.sleep(2) # Source class SourceTwitterFetcher(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: - return True, None + def check_connection(self, logger, config) -> Tuple[bool, any]: + return True, None - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - return [ - TwitterAccountData( - api_key=config['api_key'], - accounts=config['accounts']), - TwitterTweet( - api_key=config['api_key'], - accounts=config['accounts'], - start_time=config['start_time'], - stop_time=datetime.now().isoformat()) - ] + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + auth=SingleUseRefreshTokenOauth2Authenticator( + config, token_refresh_endpoint="https://api.twitter.com/2/oauth2/token") + return [ + Account(authenticator=auth), + Tweet(authenticator=auth, start_time=config['start_time'], + stop_time=datetime.now().isoformat() + ) + ] diff --git a/source-twitter-fetcher/source_twitter_fetcher/spec.yaml b/source-twitter-fetcher/source_twitter_fetcher/spec.yaml index 555f6ca..b0517f7 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/spec.yaml +++ b/source-twitter-fetcher/source_twitter_fetcher/spec.yaml @@ -4,20 +4,38 @@ connectionSpecification: title: Twitter Fetcher type: object required: - - api_key - - accounts + - credentials - start_time properties: - api_key: - type: string - description: "API Key to authentify to twitter" - airbyte_secret: true - accounts: - type: array - description: "List of accounts needing to be extracted" - items: - type: string + credentials: + title: Twitter Dev account Credentials + type: object + properties: + client_id: + title: client_id + type: string + description: "Client ID of Twitter Application" + airbyte_secret: true + client_secret: + title: client_secret + type: string + description: "Client secret of Twitter Application" + airbyte_secret: true + access_token: + title: access_token + type: string + description: "Access Token of Twitter Dev Account link" + airbyte_secret: true + refresh_token: + title: refresh_token + type: string + description: "Access Token of Twitter Dev Account link" + airbyte_secret: true + token_expiry_date: + title: token_expiry_date + type: string + description: "Access Token of Twitter Dev Account link" start_time: - type: string - description: "Start date of fetching data" - format: date-time + type: string + description: "Start date of fetching data" + format: date-time