diff --git a/source-twitter-fetcher/sample_files/configured_catalog.json b/source-twitter-fetcher/sample_files/configured_catalog.json index 8a4e3a0..ee41b80 100644 --- a/source-twitter-fetcher/sample_files/configured_catalog.json +++ b/source-twitter-fetcher/sample_files/configured_catalog.json @@ -41,6 +41,20 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "tweet_promoted", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" } ] } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json index 1aefa63..338a3bb 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/account.json @@ -26,6 +26,10 @@ "verified_type": { "type": [ "null", "string" ] }, + "created_at": { + "type": [ "null", "string" ], + "format": "date-time" + }, "public_metrics": { "type": ["null", "object" ], "properties": { diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json index 38d7eec..ff2ee1f 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet.json @@ -36,7 +36,7 @@ }, "public_metrics": { "type": ["null", "object"], - "properties": { + "properties": { "retweet_count": { "type": ["null", "number"] }, @@ -52,7 +52,7 @@ "impression_count": { "type": ["null", "number"] }, - "bookmark_count": { + "bookmark_count": { "type": ["null", "number"] } } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_metrics.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_metrics.json index be1ba4c..6ddaf6e 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_metrics.json +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_metrics.json @@ -23,7 +23,7 @@ "public_metrics": { "type": ["null", "object"], - "properties": { + "properties": { "retweet_count": { "type": ["null", "number"] }, @@ -39,14 +39,14 @@ "impression_count": { "type": ["null", "number"] }, - "bookmark_count": { + "bookmark_count": { "type": ["null", "number"] } } }, "non_public_metrics": { "type": ["null", "object" ], - "properties": { + "properties": { "impression_count": { "type": ["null", "number"] }, @@ -69,30 +69,7 @@ }, "organic_metrics": { "type": ["null", "object" ], - "properties": { - "impression_count": { - "type": ["null", "number"] - }, - "url_link_clicks": { - "type": ["null", "number"] - }, - "user_profile_clicks": { - "type": ["null", "number"] - }, - "retweet_count": { - "type": ["null", "number"] - }, - "reply_count": { - "type": ["null", "number"] - }, - "like_count": { - "type": ["null", "number"] - } - } - }, - "promoted_metrics": { - "type": ["null", "object" ], - "properties": { + "properties": { "impression_count": { "type": ["null", "number"] }, diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_promoted.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_promoted.json new file mode 100644 index 0000000..29f8d82 --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tweet_promoted.json @@ -0,0 +1,47 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "created_at": { + "type": ["null", "string"] + }, + "author_id": { + "type": ["null", "string"] + }, + "conversation_id": { + "type": ["null", "string"] + }, + "reply_settings": { + "type": ["null", "string"] + }, + "promoted_metrics": { + "type": ["null", "object" ], + "properties": { + "impression_count": { + "type": ["null", "number"] + }, + "url_link_clicks": { + "type": ["null", "number"] + }, + "user_profile_clicks": { + "type": ["null", "number"] + }, + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + } + } + } + } +} diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index 6e87a32..c0d64f9 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -144,6 +144,48 @@ class TweetMetrics(HttpSubStream, Tweet): yield data time.sleep(2) +class TweetPromoted(HttpSubStream, Tweet): + primary_key = "id" + + def path( + self, stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + tweet_id = stream_slice.get("id") + logger.debug("Fetching tweet %s from Account id %s", tweet_id, self.account_id) + return f"tweets/{tweet_id}" + + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + limit_date = datetime.today()- timedelta(31) + for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): + tweet = parent_slice["parent"] + if datetime.strptime(tweet.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ") > limit_date: + yield {"id": tweet.get('id') } + else: + logger.info("Not calling promoted_metrics endpoint for tweet %s, tweet too old", tweet.get('id')) + + def request_params( + self, stream_state: Optional[Mapping[str, Any]], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + params = { + "tweet.fields" : "promoted_metrics", + } + # Add condition later: + logger.debug(f"DBG-FULL - query params: %s", params) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if 'data' in response.json(): + data=response.json()['data'] + yield data + elif 'error' in response.json(): + logger.info("No promoted Metrics for this tweet") + time.sleep(2) + + # Source class SourceTwitterFetcher(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: @@ -162,8 +204,14 @@ class SourceTwitterFetcher(AbstractSource): account_id=config['account_id'], parent=tweet ) + tweetPromoted = TweetPromoted( + authenticator=auth, + account_id=config['account_id'], + parent=tweet + ) return [ Account(authenticator=auth, account_id=config["account_id"]), tweet, - tweetMetrics + tweetMetrics, + tweetPromoted ]