From 18dece8f3e861533c0131dc4eb4aee2e9147ee2f Mon Sep 17 00:00:00 2001 From: Alexis Pentori Date: Fri, 6 Sep 2024 16:54:34 +0200 Subject: [PATCH] twitter: add full history stream Signed-off-by: Alexis Pentori --- source-twitter-fetcher/metadata.yaml | 2 +- .../sample_files/configured_catalog.json | 14 ++++ .../schemas/promoted_metrics.json | 75 +++++++++++++++++++ .../source_twitter_fetcher/schemas/tweet.json | 23 ++++++ .../schemas/tweet_full_history.json | 61 +++++++++++++++ .../source_twitter_fetcher/source.py | 69 ++++++++++++++--- 6 files changed, 233 insertions(+), 11 deletions(-) create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_metrics.json create mode 100644 source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_full_history.json diff --git a/source-twitter-fetcher/metadata.yaml b/source-twitter-fetcher/metadata.yaml index b113a22..0881a79 100644 --- a/source-twitter-fetcher/metadata.yaml +++ b/source-twitter-fetcher/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 - dockerImageTag: '0.2.1' + dockerImageTag: '0.3.0' dockerRepository: status-im/airbyte/source-twitter-fetcher githubIssueLabel: source-twitter-fetcher icon: twitter-fetcher.svg diff --git a/source-twitter-fetcher/sample_files/configured_catalog.json b/source-twitter-fetcher/sample_files/configured_catalog.json index f1b620d..ac794ca 100644 --- a/source-twitter-fetcher/sample_files/configured_catalog.json +++ b/source-twitter-fetcher/sample_files/configured_catalog.json @@ -27,6 +27,20 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "tweet_full_history", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" } ] } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_metrics.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_metrics.json new file mode 100644 index 0000000..43cdf73 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/promoted_metrics.json @@ -0,0 +1,75 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "author_id": { + "type": ["null", "string"] + }, + "non_public_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": ["null", "number"] + }, + "url_link_clicks": { + "type": ["null", "number"] + }, + "user_profile_clicks": { + "type": ["null", "number"] + } + } + }, + "organic_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": ["null", "number"] + }, + "url_link_clicks": { + "type": ["null", "number"] + }, + "user_profile_clicks": { + "type": ["null", "number"] + }, + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + } + } + }, + "promoted_metrics": : { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": ["null", "number"] + }, + "url_link_clicks": { + "type": ["null", "number"] + }, + "user_profile_clicks": { + "type": ["null", "number"] + }, + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + } + } + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json index 514a15f..355ad70 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json @@ -116,6 +116,29 @@ "type": ["null", "number"] } } + }, + "promoted_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": ["null", "number"] + }, + "url_link_clicks": { + "type": ["null", "number"] + }, + "user_profile_clicks": { + "type": ["null", "number"] + }, + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + } + } } } } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_full_history.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_full_history.json new file mode 100644 index 0000000..38d7eec --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_full_history.json @@ -0,0 +1,61 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "created_at": { + "type": ["null", "string"] + }, + "author_id": { + "type": ["null", "string"] + }, + "conversation_id": { + "type": ["null", "string"] + }, + "reply_settings": { + "type": ["null", "string"] + }, + "referenced_tweets": { + "type": ["null", "array"], + "items": { + "type": ["object"], + "properties":{ + "type": { + "type": ["null", "string"] + }, + "id": { + "type": ["null", "number"] + } + } + } + }, + "public_metrics": { + "type": ["null", "object"], + "properties": { + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + }, + "quote_count": { + "type": ["null", "number"] + }, + "impression_count": { + "type": ["null", "number"] + }, + "bookmark_count": { + "type": ["null", "number"] + } + } + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index eb63e2b..316d36c 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -21,10 +21,9 @@ class TwitterStream(HttpStream): url_base = "https://api.twitter.com/2/" - def __init__(self, start_time: str = None, stop_time: str = None, **kwargs): + def __init__(self, start_time: str = None, **kwargs): super().__init__(**kwargs) self.start_time = start_time - self.stop_time = stop_time; def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None @@ -64,7 +63,6 @@ class Account(TwitterStream): # Wait to avoid reaching API limit time.sleep(2) - class Tweet(HttpSubStream, Account): primary_key = "id" @@ -79,8 +77,8 @@ class Tweet(HttpSubStream, Account): return f"users/{account_id}/tweets" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - logger.debug('Looking if there is some next token to add in the query') if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0: + logger.info('DBG-NT: %s', response.json()['meta']['next_token']) return {"pagination_token": response.json()['meta']['next_token']} def request_params( @@ -90,13 +88,13 @@ class Tweet(HttpSubStream, Account): ) -> MutableMapping[str, Any]: params = { "tweet.fields" : "text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at", - "max_results": 10 + "max_results": 100 } # Add condition later: - limit = datetime.now() - timedelta(30) params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) if next_page_token: params.update(**next_page_token) + logger.info(f"DBG - query params: %s", params) return params @@ -105,6 +103,51 @@ class Tweet(HttpSubStream, Account): if 'data' in response.json(): data=response.json()['data'] for t in data: + logger.info("DBG-T: id %s", t.get('id')) + yield t + time.sleep(2) + +class TweetFullHistory(HttpSubStream, Account): + primary_key = "id" + + def path( + self, stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + account_id = stream_slice.get("parent").get("id") + logger.info("Account id %s", account_id) + #return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,promoted_metrics,created_at" + return f"users/{account_id}/tweets" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0: + logger.info('DBG-FULL-NT: %s', response.json()['meta']['next_token']) + return {"pagination_token": response.json()['meta']['next_token']} + + def request_params( + self, stream_state: Optional[Mapping[str, Any]], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = { + "tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at", + "max_results": 100 + } + # Add condition later: + params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) + if next_page_token: + params.update(**next_page_token) + logger.info(f"DBG-FULL - query params: %s", params) + return params + + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + logger.info("Twtter Response: %s", response.json()) + if 'data' in response.json(): + data=response.json()['data'] + for t in data: + logger.info("DBG-FULL-T: id %s", t.get('id')) yield t time.sleep(2) @@ -117,12 +160,18 @@ class SourceTwitterFetcher(AbstractSource): auth=SingleUseRefreshTokenOauth2Authenticator( config, token_refresh_endpoint="https://api.twitter.com/2/oauth2/token") account =Account(authenticator=auth, start_time=datetime.strptime(config['start_time'], "%Y-%m-%d")) - return [ - account, - Tweet( + tweet = Tweet( authenticator=auth, start_time=datetime.strptime(config['start_time'], "%Y-%m-%d"), - stop_time=datetime.now().isoformat(), parent=account ) + historycallTweet = TweetFullHistory( + authenticator=auth, + start_time=datetime.strptime(config['start_time'], "%Y-%m-%d"), + parent=account + ) + return [ + account, + tweet, + historycallTweet ]