mirror of https://github.com/vacp2p/10ksim.git
Dump metric data to .csv
This commit is contained in:
parent
8553fe0f4f
commit
6c56b19e2e
4
main.py
4
main.py
|
@ -14,8 +14,8 @@ def main():
|
|||
|
||||
v1 = client.CoreV1Api()
|
||||
|
||||
scrapper = Scrapper(url, namespace, metrics)
|
||||
scrapper.make_queries()
|
||||
scrapper = Scrapper(url, namespace, "test/", metrics)
|
||||
scrapper.query_and_dump_metrics()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
|
@ -1,32 +1,121 @@
|
|||
# Python Imports
|
||||
import requests
|
||||
from typing import List
|
||||
import logging
|
||||
import pandas as pd
|
||||
from itertools import chain
|
||||
from typing import List, Dict
|
||||
from requests import Response
|
||||
from pathlib import Path
|
||||
|
||||
# Project Imports
|
||||
import src.logging.logger
|
||||
from src.metrics import scrape_utils
|
||||
from result import Ok, Err, Result
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Scrapper:
|
||||
def __init__(self, url: str, namespace: str, metrics: List):
|
||||
def __init__(self, url: str, namespace: str, out_folder: str, metrics: List):
|
||||
self._url = url
|
||||
self._namespace = namespace
|
||||
self._out_folder = out_folder
|
||||
self._metrics = metrics
|
||||
# TODO make interval match value in cluster
|
||||
self._template = "irate($metric{namespace=$namespace}[3m])"
|
||||
self._template = 'irate($metric{namespace=$namespace}[3m])'
|
||||
|
||||
def make_queries(self):
|
||||
def query_and_dump_metrics(self):
|
||||
for metric in self._metrics:
|
||||
query = self._template.replace("$metric", metric)
|
||||
query = query.replace("$namespace", self._namespace)
|
||||
promql = scrape_utils.create_promql(self._url, query, 1, 60)
|
||||
logger.info(f"Promql: {promql}")
|
||||
response = requests.get(promql)
|
||||
logger.info(f"Response: {response.status_code}")
|
||||
logger.info(f'Querying {metric}')
|
||||
promql = self._create_query(metric)
|
||||
|
||||
result = self._make_query(promql)
|
||||
if result.is_err():
|
||||
logger.warning(f'Error querying {metric}. {result.err_value}')
|
||||
|
||||
response = result.ok_value
|
||||
logger.info(f'Response: {response.status_code}')
|
||||
data = response.json()['data']
|
||||
|
||||
logger.info(f'Dumping {metric} data to .csv')
|
||||
self._dump_data(metric, data)
|
||||
|
||||
def _create_query(self, metric: str) -> str:
|
||||
query = self._template.replace('$metric', metric)
|
||||
query = query.replace('$namespace', self._namespace)
|
||||
promql = scrape_utils.create_promql(self._url, query, 1, 60)
|
||||
|
||||
return promql
|
||||
|
||||
def _make_query(self, promql: str) -> Result[Response, str]:
|
||||
try:
|
||||
response = requests.get(promql, timeout=30)
|
||||
except requests.exceptions.Timeout:
|
||||
return Err(f'Timeout error.')
|
||||
|
||||
if response.ok:
|
||||
return Ok(response)
|
||||
return Err(f'Error in query. Status code {response.status_code}. {response.content}')
|
||||
|
||||
def _dump_data(self, metric: str, data: Dict):
|
||||
df = self._create_dataframe_from_data(data)
|
||||
df = self._sort_dataframe(df)
|
||||
|
||||
result = self._prepare_path(metric)
|
||||
if result.is_err():
|
||||
logger.error(f'{result.err_value}')
|
||||
exit(1)
|
||||
|
||||
df.to_csv(result.ok_value)
|
||||
logger.info(f'{metric} data dumped')
|
||||
|
||||
def _prepare_path(self, metric: str) -> Result[Path, str]:
|
||||
output_file = f'{metric}.csv'
|
||||
output_dir = Path(self._out_folder + output_file)
|
||||
|
||||
try:
|
||||
output_dir.mkdir(parents=True)
|
||||
except OSError as e:
|
||||
return Err(f'Error creating {output_dir}. {e}')
|
||||
|
||||
return Ok(output_dir)
|
||||
|
||||
def _create_dataframe_from_data(self, data: Dict) -> pd.DataFrame:
|
||||
final_df = pd.DataFrame()
|
||||
for pod_result_dict in data['result']:
|
||||
column_name = pod_result_dict['metric']['pod'] + '_' + pod_result_dict['metric']['node']
|
||||
values = pod_result_dict['values']
|
||||
|
||||
pod_df = self._create_pod_df(column_name, values)
|
||||
|
||||
final_df = pd.merge(final_df, pod_df, how='outer', left_index=True, right_index=True)
|
||||
|
||||
return final_df
|
||||
|
||||
def _sort_dataframe(self, df) -> pd.DataFrame:
|
||||
columns = self._order(df.columns.tolist())
|
||||
df = df[columns]
|
||||
|
||||
return df
|
||||
|
||||
def _create_pod_df(self, column_name, values) -> pd.DataFrame:
|
||||
pod_df = pd.DataFrame(values, columns=['Unix Timestamp', column_name])
|
||||
pod_df['Unix Timestamp'] = pd.to_datetime(pod_df['Unix Timestamp'], unit='s')
|
||||
pod_df.set_index('Unix Timestamp', inplace=True)
|
||||
|
||||
return pod_df
|
||||
|
||||
# TODO this depends on pods name assigned in deployment
|
||||
def _order(self, column_names: List) -> List:
|
||||
def get_default_format_id(val):
|
||||
return int(val.split('-')[1].split('_')[0])
|
||||
|
||||
columns_without_nodes = []
|
||||
columns_without_bootstrap = []
|
||||
nodes = [item if item.startswith('nodes') else columns_without_nodes.append(item)
|
||||
for item in column_names]
|
||||
bootstrap = [item if item.startswith('bootstrap') else columns_without_bootstrap.append(item)
|
||||
for item in columns_without_nodes]
|
||||
nodes.sort(key=get_default_format_id)
|
||||
bootstrap.sort(key=get_default_format_id)
|
||||
|
||||
return list(chain(columns_without_bootstrap, bootstrap, nodes))
|
||||
|
|
Loading…
Reference in New Issue