twitter: add full history stream

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2024-09-06 16:54:34 +02:00
parent ac1b49b6e4
commit 18dece8f3e
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
6 changed files with 233 additions and 11 deletions

View File

@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920
dockerImageTag: '0.2.1'
dockerImageTag: '0.3.0'
dockerRepository: status-im/airbyte/source-twitter-fetcher
githubIssueLabel: source-twitter-fetcher
icon: twitter-fetcher.svg

View File

@ -27,6 +27,20 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "tweet_full_history",
"json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object"
},
"supported_sync_modes": [
"full_refresh", "incremental"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
}

View File

@ -0,0 +1,75 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"author_id": {
"type": ["null", "string"]
},
"non_public_metrics": {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": ["null", "number"]
},
"url_link_clicks": {
"type": ["null", "number"]
},
"user_profile_clicks": {
"type": ["null", "number"]
}
}
},
"organic_metrics": {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": ["null", "number"]
},
"url_link_clicks": {
"type": ["null", "number"]
},
"user_profile_clicks": {
"type": ["null", "number"]
},
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
}
}
},
"promoted_metrics": : {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": ["null", "number"]
},
"url_link_clicks": {
"type": ["null", "number"]
},
"user_profile_clicks": {
"type": ["null", "number"]
},
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
}
}
}
}
}

View File

@ -116,6 +116,29 @@
"type": ["null", "number"]
}
}
},
"promoted_metrics": {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": ["null", "number"]
},
"url_link_clicks": {
"type": ["null", "number"]
},
"user_profile_clicks": {
"type": ["null", "number"]
},
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
}
}
}
}
}

View File

@ -0,0 +1,61 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"created_at": {
"type": ["null", "string"]
},
"author_id": {
"type": ["null", "string"]
},
"conversation_id": {
"type": ["null", "string"]
},
"reply_settings": {
"type": ["null", "string"]
},
"referenced_tweets": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties":{
"type": {
"type": ["null", "string"]
},
"id": {
"type": ["null", "number"]
}
}
}
},
"public_metrics": {
"type": ["null", "object"],
"properties": {
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
},
"quote_count": {
"type": ["null", "number"]
},
"impression_count": {
"type": ["null", "number"]
},
"bookmark_count": {
"type": ["null", "number"]
}
}
}
}
}

View File

@ -21,10 +21,9 @@ class TwitterStream(HttpStream):
url_base = "https://api.twitter.com/2/"
def __init__(self, start_time: str = None, stop_time: str = None, **kwargs):
def __init__(self, start_time: str = None, **kwargs):
super().__init__(**kwargs)
self.start_time = start_time
self.stop_time = stop_time;
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
@ -64,7 +63,6 @@ class Account(TwitterStream):
# Wait to avoid reaching API limit
time.sleep(2)
class Tweet(HttpSubStream, Account):
primary_key = "id"
@ -79,8 +77,8 @@ class Tweet(HttpSubStream, Account):
return f"users/{account_id}/tweets"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
logger.debug('Looking if there is some next token to add in the query')
if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0:
logger.info('DBG-NT: %s', response.json()['meta']['next_token'])
return {"pagination_token": response.json()['meta']['next_token']}
def request_params(
@ -90,13 +88,13 @@ class Tweet(HttpSubStream, Account):
) -> MutableMapping[str, Any]:
params = {
"tweet.fields" : "text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at",
"max_results": 10
"max_results": 100
}
# Add condition later:
limit = datetime.now() - timedelta(30)
params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
if next_page_token:
params.update(**next_page_token)
logger.info(f"DBG - query params: %s", params)
return params
@ -105,6 +103,51 @@ class Tweet(HttpSubStream, Account):
if 'data' in response.json():
data=response.json()['data']
for t in data:
logger.info("DBG-T: id %s", t.get('id'))
yield t
time.sleep(2)
class TweetFullHistory(HttpSubStream, Account):
primary_key = "id"
def path(
self, stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
account_id = stream_slice.get("parent").get("id")
logger.info("Account id %s", account_id)
#return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,promoted_metrics,created_at"
return f"users/{account_id}/tweets"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0:
logger.info('DBG-FULL-NT: %s', response.json()['meta']['next_token'])
return {"pagination_token": response.json()['meta']['next_token']}
def request_params(
self, stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params = {
"tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at",
"max_results": 100
}
# Add condition later:
params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
if next_page_token:
params.update(**next_page_token)
logger.info(f"DBG-FULL - query params: %s", params)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
logger.info("Twtter Response: %s", response.json())
if 'data' in response.json():
data=response.json()['data']
for t in data:
logger.info("DBG-FULL-T: id %s", t.get('id'))
yield t
time.sleep(2)
@ -117,12 +160,18 @@ class SourceTwitterFetcher(AbstractSource):
auth=SingleUseRefreshTokenOauth2Authenticator(
config, token_refresh_endpoint="https://api.twitter.com/2/oauth2/token")
account =Account(authenticator=auth, start_time=datetime.strptime(config['start_time'], "%Y-%m-%d"))
return [
account,
Tweet(
tweet = Tweet(
authenticator=auth,
start_time=datetime.strptime(config['start_time'], "%Y-%m-%d"),
stop_time=datetime.now().isoformat(),
parent=account
)
historycallTweet = TweetFullHistory(
authenticator=auth,
start_time=datetime.strptime(config['start_time'], "%Y-%m-%d"),
parent=account
)
return [
account,
tweet,
historycallTweet
]