twitter-fetcher: update connector

Signed-off-by: Alexis Pentori <alexis@status.im>
This commit is contained in:
Alexis Pentori 2024-03-27 15:56:48 +01:00
parent 58c1ea967e
commit 11efa92811
No known key found for this signature in database
GPG Key ID: 65250D2801E47A10
10 changed files with 274 additions and 240 deletions

View File

@ -10,7 +10,7 @@ data:
connectorSubtype: api connectorSubtype: api
connectorType: source connectorType: source
definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920
dockerImageTag: '0.0.1' dockerImageTag: '0.2.0'
dockerRepository: status-im/airbyte/source-twitter-fetcher dockerRepository: status-im/airbyte/source-twitter-fetcher
githubIssueLabel: source-twitter-fetcher githubIssueLabel: source-twitter-fetcher
icon: twitter-fetcher.svg icon: twitter-fetcher.svg

View File

@ -1,15 +1,10 @@
{ {
"api_key": "some_key", "credentials":{
"accounts": [ "client_id": "some-id",
"Logos_network", "client_secret": "some-secret",
"Codex_storage", "access_token": "some-access-token",
"Waku_org", "refresh_token": "some-refresh-token",
"ethnimbus", "token_expiry_date": ""
"ac1d_info", },
"HashingItOutPod", "start_time": "2024-01-01"
"vacp2p",
"InstituteFT"
],
"start_time": "2024-01-01",
"stop_time": "2024-01-26"
} }

View File

@ -2,7 +2,7 @@
"streams": [ "streams": [
{ {
"stream": { "stream": {
"name": "twitter_account_data", "name": "account",
"json_schema": { "json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#", "$schema": "http://json-schema.org/draft-04/schema#",
"type": "object" "type": "object"
@ -16,7 +16,7 @@
}, },
{ {
"stream": { "stream": {
"name": "twitter_tweet", "name": "tweet",
"json_schema": { "json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#", "$schema": "http://json-schema.org/draft-04/schema#",
"type": "object" "type": "object"
@ -28,6 +28,5 @@
"sync_mode": "incremental", "sync_mode": "incremental",
"destination_sync_mode": "overwrite" "destination_sync_mode": "overwrite"
} }
] ]
} }

View File

@ -0,0 +1,35 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": [ "null", "string" ]
},
"name": {
"type": [ "null", "string" ]
},
"username": {
"type": [ "null", "string" ]
},
"public_metrics": {
"type": ["null", "object" ],
"properties": {
"tweet_count": {
"type": [ "null", "number" ]
},
"like_count": {
"type": [ "null", "number" ]
},
"following_count": {
"type": [ "null", "number" ]
},
"follower_count": {
"type": [ "null", "number" ]
},
"listed_count": {
"type": [ "null", "number" ]
}
}
}
}
}

View File

@ -0,0 +1,33 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": [ "null", "string" ]
},
"name": {
"type": [ "null", "string" ]
},
"username": {
"type": [ "null", "string" ]
},
"created_at": {
"type": [ "null", "string" ]
},
"location": {
"type": [ "null", "string" ]
},
"url": {
"type": [ "null", "string" ]
},
"description": {
"type": [ "null", "string" ]
},
"verified": {
"type": [ "null", "boolean" ]
},
"verified_type": {
"type": [ "null", "string" ]
}
}
}

View File

@ -0,0 +1,98 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": [ "null", "string"]
},
"text": {
"type": [ "null", "string"]
},
"created_at": {
"type": [ "null", "string"]
},
"author_id": {
"type": [ "null", "string"]
},
"conversation_id": {
"type": [ "null", "string"]
},
"reply_settings": {
"type": ["null", "string"]
},
"referenced_tweets": {
"type": [ "null", "array" ],
"items": {
"type": ["object"],
"properties":{
"type": {
"type": [ "null", "string" ]
},
"id": {
"type": [ "null", "number" ]
}
}
}
},
"public_metrics": {
"type": ["null", "object" ],
"properties": {
"retweet_count": {
"type": [ "null", "number" ]
},
"reply_count": {
"type": [ "null", "number" ]
},
"like_count": {
"type": [ "null", "number" ]
},
"quote_count": {
"type": [ "null", "number" ]
},
"impression_count": {
"type": [ "null", "number" ]
},
"bookmark_count": {
"type": [ "null", "number" ]
}
}
},
"non_public_metrics": {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": [ "null", "number" ]
},
"url_link_clicks": {
"type": [ "null", "number" ]
},
"user_profile_clicks": {
"type": [ "null", "number" ]
}
}
},
"organic_metrics": {
"type": ["null", "object" ],
"properties": {
"impression_count": {
"type": [ "null", "number" ]
},
"url_link_clicks": {
"type": [ "null", "number" ]
},
"user_profile_clicks": {
"type": [ "null", "number" ]
},
"retweet_count": {
"type": [ "null", "number" ]
},
"reply_count": {
"type": [ "null", "number" ]
},
"like_count": {
"type": [ "null", "number" ]
}
}
}
}
}

View File

@ -1,54 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"account_id": {
"type": [
"null",
"string"
]
},
"account_name": {
"type": [
"null",
"string"
]
},
"username": {
"type": [
"null",
"string"
]
},
"tweet_count": {
"type": [
"null",
"number"
]
},
"like_count": {
"type": [
"null",
"number"
]
},
"following_count": {
"type": [
"null",
"number"
]
},
"follower_count": {
"type": [
"null",
"number"
]
},
"listed_count": {
"type": [
"null",
"number"
]
}
}
}

View File

@ -1,48 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": [
"null",
"string"
]
},
"id": {
"created_at": [
"null",
"string"
]
},
"retweet_count": {
"type": [
"null",
"number"
]
},
"reply_count": {
"type": [
"null",
"number"
]
},
"like_count": {
"type": [
"null",
"number"
]
},
"quote_count": {
"type": [
"null",
"number"
]
},
"referenced_tweets": {
"type": [
"null",
"string"
]
}
}
}

View File

@ -12,7 +12,8 @@ from datetime import datetime
from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator
logger = logging.getLogger("airbyte") logger = logging.getLogger("airbyte")
@ -20,44 +21,29 @@ class TwitterStream(HttpStream):
url_base = "https://api.twitter.com/2/" url_base = "https://api.twitter.com/2/"
def __init__(self, api_key: str=None, accounts: List=None, start_time: str = None, stop_time: str = None, **kwargs): def __init__(self, start_time: str = None, stop_time: str = None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.api_key = api_key
self.accounts = accounts
self.start_time = start_time self.start_time = start_time
self.stop_time = stop_time; self.stop_time = stop_time;
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None return None
class TwitterAccountData(TwitterStream):
primary_key = "account_id" class Account(TwitterStream):
primary_key = "id"
@property @property
def use_cache(self) -> bool: def use_cache(self) -> bool:
return True return True
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for account in self.accounts:
yield {
"name": account
}
def path( def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None self, stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str: ) -> str:
return f"users/by/username/{stream_slice['name']}?user.fields=public_metrics" return f"users/me?user.fields=public_metrics"
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"Authorization" : f"Bearer {self.api_key}",
"User-Agent": "v2RecentSearchPython"
}
def parse_response( def parse_response(
self, self,
@ -65,59 +51,33 @@ class TwitterAccountData(TwitterStream):
stream_slice: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
**kwargs **kwargs
) -> Iterable[Mapping]: ) -> Iterable[Mapping]:
logger.info("Getting data of %s account", stream_slice['name'])
logger.info("Response: %s", response.json()) logger.info("Response: %s", response.json())
data=response.json()['data'] data=response.json()['data']
yield { yield data
"account_id": data['id'], # Wait to avoid reaching API limit
"username": data['username'],
"account_name": data['name'],
"tweet_count": data['public_metrics']['tweet_count'],
"like_count": data['public_metrics']['like_count'],
"following_count": data['public_metrics']['following_count'],
"follower_count": data['public_metrics']['followers_count'],
"listed_count": data['public_metrics']['listed_count'],
}
time.sleep(2) time.sleep(2)
class TwitterTweet(HttpSubStream, TwitterAccountData): class Tweet(HttpSubStream, Account):
#TODO: See how to get the account ID primary_key = "id"
primary_key = ""
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(TwitterAccountData(**kwargs),**kwargs) super().__init__(Account(**kwargs),**kwargs)
def path( def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None self, stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str: ) -> str:
account_id = stream_slice.get("parent").get("account_id") account_id = stream_slice.get("parent").get("id")
return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,author_id,referenced_tweets,created_at&start_time={self.start_time}" logger.info("Account id %s", account_id)
return f"users/{account_id}/tweets?tweet.fields=text,public_metrics,non_public_metrics,organic_metrics,author_id,referenced_tweets,created_at"
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {
"Authorization" : f"Bearer {self.api_key}",
"User-Agent": "v2RecentSearchPython"
}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
logger.debug("Twtter Response: %s", response.json())
data=response.json()['data'] data=response.json()['data']
logger.info("Response: %s", response.json())
referenced_tweets=""
for t in data: for t in data:
if "referenced_tweets" in t: yield t
for rt in t.get('referenced_tweets'):
referenced_tweets += f"{rt.get('type')}:{rt.get('id')};"
yield {
"id": t['id'],
"created_at": t.get('created_at'),
"retweet_count": t.get('public_metrics').get('retweet_count'),
"reply_count": t.get('public_metrics').get('reply_count'),
"like_count": t.get('public_metrics').get('like_count'),
"quote_count": t.get('public_metrics').get('quote_count'),
"referenced_tweets": referenced_tweets
}
time.sleep(2) time.sleep(2)
# Source # Source
@ -126,13 +86,11 @@ class SourceTwitterFetcher(AbstractSource):
return True, None return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]: def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth=SingleUseRefreshTokenOauth2Authenticator(
config, token_refresh_endpoint="https://api.twitter.com/2/oauth2/token")
return [ return [
TwitterAccountData( Account(authenticator=auth),
api_key=config['api_key'], Tweet(authenticator=auth, start_time=config['start_time'],
accounts=config['accounts']), stop_time=datetime.now().isoformat()
TwitterTweet( )
api_key=config['api_key'],
accounts=config['accounts'],
start_time=config['start_time'],
stop_time=datetime.now().isoformat())
] ]

View File

@ -4,19 +4,37 @@ connectionSpecification:
title: Twitter Fetcher title: Twitter Fetcher
type: object type: object
required: required:
- api_key - credentials
- accounts
- start_time - start_time
properties: properties:
api_key: credentials:
title: Twitter Dev account Credentials
type: object
properties:
client_id:
title: client_id
type: string type: string
description: "API Key to authentify to twitter" description: "Client ID of Twitter Application"
airbyte_secret: true airbyte_secret: true
accounts: client_secret:
type: array title: client_secret
description: "List of accounts needing to be extracted"
items:
type: string type: string
description: "Client secret of Twitter Application"
airbyte_secret: true
access_token:
title: access_token
type: string
description: "Access Token of Twitter Dev Account link"
airbyte_secret: true
refresh_token:
title: refresh_token
type: string
description: "Access Token of Twitter Dev Account link"
airbyte_secret: true
token_expiry_date:
title: token_expiry_date
type: string
description: "Access Token of Twitter Dev Account link"
start_time: start_time:
type: string type: string
description: "Start date of fetching data" description: "Start date of fetching data"