Victoria logs optimization (#36)

* Fix get_files_from_folder_path when using '*'

* Add _get_number_nodes function

* Add option to concurrently query for logs

* Comment sleep in fetch data

* Change logging

* Add function to dump csv

* Merge dfs and dump

* Change path file name to path_u

* Change pattern in extension function

* Return error when single queries go through catch

* Enhance result returning

* Add extension when reading files to not have problems with already existing ones

* Add intermediate folder for message summary

* Fix folder path

* Parallelize dump log information

* Change read logs parallelization

* Split functions for reading logs concurrently

* Add result error checking

* Enhance typing

* Change query to get nº nodes

* Improve writing csv

* Sort dataframes

* Better handle result in get number of nodes

* Improve result error response

* Add missing pandas as pd import

* Change log file name to peer_id

* Logger changes

* Add file name info to logging

* Address PR comment.

* Improved logging
This commit is contained in:
Alberto Soutullo 2024-08-30 13:52:06 +02:00 committed by GitHub
parent 94e838b95e
commit 1340441921
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 202 additions and 52 deletions

View File

@ -4,7 +4,7 @@ import pandas as pd
from typing import List, Optional from typing import List, Optional
# Project Imports # Project Imports
from src.utils import path from src.utils import path_utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -15,7 +15,7 @@ class DataHandler:
self._dataframe = pd.DataFrame() self._dataframe = pd.DataFrame()
def dump_dataframe(self, dump_path: str): def dump_dataframe(self, dump_path: str):
result = path.prepare_path(dump_path) result = path_utils.prepare_path(dump_path)
if result.is_err(): if result.is_err():
logger.error(f'{result.err_value}') logger.error(f'{result.err_value}')
exit(1) exit(1)

View File

@ -3,8 +3,9 @@ import json
import logging import logging
import re import re
import time import time
import pandas as pd
import requests import requests
from typing import Dict, List, Optional from typing import Dict, List, Optional, Iterator
from httpx import Response from httpx import Response
from result import Result, Ok, Err from result import Result, Ok, Err
@ -22,8 +23,8 @@ class VictoriaReader:
self.logs = [] self.logs = []
def _fetch_data(self, headers: Dict, params: Dict): def _fetch_data(self, headers: Dict, params: Dict):
logger.info(f'Fetching {params}') logger.debug(f'Fetching {params}')
time.sleep(5) # time.sleep(5)
with requests.post(self._config['url'], headers=headers, params=params, stream=True) as response: with requests.post(self._config['url'], headers=headers, params=params, stream=True) as response:
for line in response.iter_lines(): for line in response.iter_lines():
if line: if line:
@ -33,8 +34,7 @@ class VictoriaReader:
logger.info(line) logger.info(line)
exit() exit()
self.logs.append((parsed_object['_msg'], parsed_object['kubernetes_pod_name'])) self.logs.append((parsed_object['_msg'], parsed_object['kubernetes_pod_name']))
logger.debug("line added") logger.debug(f'Fetched {len(self.logs)} messages')
logger.info(f'Fetched {len(self.logs)} messages')
def _make_queries(self) -> List: def _make_queries(self) -> List:
results = [[] for _ in self._tracer.patterns] results = [[] for _ in self._tracer.patterns]
@ -47,13 +47,13 @@ class VictoriaReader:
match_as_list = list(match.groups()) match_as_list = list(match.groups())
match_as_list.append(log_line[1]) match_as_list.append(log_line[1])
results[i].append(match_as_list) results[i].append(match_as_list)
logger.info('Fetched lines parsed with pattern') # logger.debug('Fetched lines parsed with pattern')
self.logs.clear() self.logs.clear()
return results return results
def read(self) -> List: def read(self) -> List[pd.DataFrame]:
logger.info(f'Reading {self._config["url"]}') # logger.info(f'Reading {self._config["url"]}')
results = self._make_queries() results = self._make_queries()
dfs = self._tracer.trace(results) dfs = self._tracer.trace(results)
@ -74,3 +74,20 @@ class VictoriaReader:
logger.error(f'Failed to decode JSON: {e}') logger.error(f'Failed to decode JSON: {e}')
logger.error(f'Response content: {response.content}') logger.error(f'Response content: {response.content}')
return Err(response)
def multi_query_info(self) -> Result[Iterator, str]:
time.sleep(10)
response = requests.post(self._config['url'], headers=self._config['headers'], params=self._config['params'])
if response.status_code != 200:
logger.error(f'Request failed with status code: {response.status_code}')
return Err(response.text)
try:
data = response.iter_lines()
return Ok(data)
except json.decoder.JSONDecodeError as e:
logger.error(f'Failed to decode JSON: {e}')
logger.error(f'Response content: {response.content}')
return Err(response.text)

View File

@ -8,7 +8,7 @@ from result import Ok, Err
# Project Imports # Project Imports
from src.mesh_analysis.tracers.message_tracer import MessageTracer from src.mesh_analysis.tracers.message_tracer import MessageTracer
from src.utils import file_utils, path from src.utils import path_utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -37,7 +37,7 @@ class WakuTracer(MessageTracer):
self._patterns.append(r'(.*)') self._patterns.append(r'(.*)')
self._tracings.append(self._trace_all_logs) self._tracings.append(self._trace_all_logs)
def trace(self, parsed_logs: List) -> List: def trace(self, parsed_logs: List) -> List[pd.DataFrame]:
dfs = [trace(parsed_logs[i]) for i, trace in enumerate(self._tracings) if trace is not None] dfs = [trace(parsed_logs[i]) for i, trace in enumerate(self._tracings) if trace is not None]
return dfs return dfs
@ -112,8 +112,8 @@ class WakuTracer(MessageTracer):
msg_sent_data = self.check_if_msg_has_been_sent(peers_missed_messages, missed_messages, sent_df) msg_sent_data = self.check_if_msg_has_been_sent(peers_missed_messages, missed_messages, sent_df)
for data in msg_sent_data: for data in msg_sent_data:
peer_id = data[0].split('*')[-1] peer_id = data[0].split('*')[-1]
logger.warning(f'Peer {peer_id} message information dumped in {issue_dump_location}') logger.info(f'Peer {peer_id} message information dumped in {issue_dump_location}')
match path.prepare_path(issue_dump_location / f"{data[0].split('*')[-1]}.csv"): match path_utils.prepare_path(issue_dump_location / f"{data[0].split('*')[-1]}.csv"):
case Ok(location_path): case Ok(location_path):
data[1].to_csv(location_path) data[1].to_csv(location_path)
case Err(err): case Err(err):

View File

@ -1,7 +1,9 @@
# Python Imports # Python Imports
import logging import logging
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path from pathlib import Path
from typing import List from typing import List, Dict
from result import Ok, Err, Result from result import Ok, Err, Result
# Project Imports # Project Imports
@ -19,7 +21,7 @@ class WakuMessageLogAnalyzer:
self._validate_analysis_location(timestamp_to_analyze, local_folder_to_analyze) self._validate_analysis_location(timestamp_to_analyze, local_folder_to_analyze)
self._set_up_paths(dump_analysis_dir, local_folder_to_analyze) self._set_up_paths(dump_analysis_dir, local_folder_to_analyze)
self._timestamp = timestamp_to_analyze self._timestamp = timestamp_to_analyze
self._set_victoria_config() # self._set_victoria_config()
def _validate_analysis_location(self, timestamp_to_analyze: str, local_folder_to_analyze: str): def _validate_analysis_location(self, timestamp_to_analyze: str, local_folder_to_analyze: str):
if timestamp_to_analyze is None and local_folder_to_analyze is None: if timestamp_to_analyze is None and local_folder_to_analyze is None:
@ -27,18 +29,27 @@ class WakuMessageLogAnalyzer:
exit(1) exit(1)
def _set_up_paths(self, dump_analysis_dir: str, local_folder_to_analyze: str): def _set_up_paths(self, dump_analysis_dir: str, local_folder_to_analyze: str):
self._dump_analysis_dir_path = Path(dump_analysis_dir) if dump_analysis_dir else None self._folder_path = Path(dump_analysis_dir) if dump_analysis_dir else Path(local_folder_to_analyze)
self._local_folder_to_analyze_path = Path(local_folder_to_analyze) if local_folder_to_analyze else None
def _set_victoria_config(self): def _get_victoria_config_parallel(self, pod_name: str) -> Dict:
self._victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query", return {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"}, "headers": {"Content-Type": "application/json"},
"params": [ "params": [
{ {
"query": f"kubernetes_container_name:waku AND received relay message AND _time:{self._timestamp} | sort by (_time)"}, "query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{pod_name} AND received relay message AND _time:{self._timestamp}"},
{ {
"query": f"kubernetes_container_name:waku AND sent relay message AND _time:{self._timestamp} | sort by (_time)"}] "query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{pod_name} AND sent relay message AND _time:{self._timestamp}"}]
} }
def _get_victoria_config_single(self) -> Dict:
return {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": [
{
"query": f"kubernetes_container_name:waku AND received relay message AND _time:{self._timestamp}"},
{
"query": f"kubernetes_container_name:waku AND sent relay message AND _time:{self._timestamp}"}]
}
def _get_affected_node_pod(self, data_file: str) -> Result[str, str]: def _get_affected_node_pod(self, data_file: str) -> Result[str, str]:
peer_id = data_file.split('.')[0] peer_id = data_file.split('.')[0]
@ -51,15 +62,16 @@ class WakuMessageLogAnalyzer:
result = reader.single_query_info() result = reader.single_query_info()
if result.is_ok(): if result.is_ok():
return Ok(result.unwrap()['kubernetes_pod_name']) pod_name = result.unwrap()['kubernetes_pod_name']
logger.debug(f'Pod name for peer id {peer_id} is {pod_name}')
return Ok(pod_name)
return Err(f'Unable to obtain pod name from {peer_id}') return Err(f'Unable to obtain pod name from {peer_id}')
def _get_affected_node_log(self, data_file: str): def _get_affected_node_log(self, data_file: str) -> Result[Path, str]:
result = self._get_affected_node_pod(data_file) result = self._get_affected_node_pod(data_file)
if result.is_err(): if result.is_err():
logger.warning(result.err_value) return Err(result.err_value)
return
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query", victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"}, "headers": {"Content-Type": "application/json"},
@ -72,49 +84,158 @@ class WakuMessageLogAnalyzer:
pod_log = reader.read() pod_log = reader.read()
log_lines = [inner_list[0] for inner_list in pod_log[0]] log_lines = [inner_list[0] for inner_list in pod_log[0]]
with open(self._dump_analysis_dir_path / f'{pod_log[0][0][1]}.log', 'w') as file: log_name_path = self._folder_path / f"{data_file.split('.')[0]}.log"
with open(log_name_path, 'w') as file:
for element in log_lines: for element in log_lines:
file.write(f"{element}\n") file.write(f"{element}\n")
return Ok(log_name_path)
def _dump_information(self, data_files: List[str]): def _dump_information(self, data_files: List[str]):
for data_file in data_files: with ProcessPoolExecutor() as executor:
logger.info(f'Dumping information for {data_file}') futures = {executor.submit(self._get_affected_node_log, data_file): data_file for data_file in data_files}
self._get_affected_node_log(data_file)
for future in as_completed(futures):
try:
result = future.result()
match result:
case Ok(log_path):
logger.info(f'{log_path} dumped')
case Err(_):
logger.warning(result.err_value)
except Exception as e:
logger.error(f'Error retrieving logs for node {futures[future]}: {e}')
def _has_issues_in_local(self) -> bool: def _has_issues_in_local(self) -> bool:
waku_tracer = WakuTracer() waku_tracer = WakuTracer()
waku_tracer.with_received_pattern() waku_tracer.with_received_pattern()
waku_tracer.with_sent_pattern() waku_tracer.with_sent_pattern()
reader = FileReader(self._local_folder_to_analyze_path, waku_tracer) reader = FileReader(self._folder_path, waku_tracer)
dfs = reader.read() dfs = reader.read()
has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1], has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1],
self._dump_analysis_dir_path) self._folder_path)
return has_issues return has_issues
def _has_issues_in_cluster(self) -> bool: def _has_issues_in_cluster_single(self) -> bool:
waku_tracer = WakuTracer() waku_tracer = WakuTracer()
waku_tracer.with_received_pattern() waku_tracer.with_received_pattern()
waku_tracer.with_sent_pattern() waku_tracer.with_sent_pattern()
reader = VictoriaReader(self._victoria_config, waku_tracer) reader = VictoriaReader(self._get_victoria_config_single(), waku_tracer)
dfs = reader.read() dfs = reader.read()
has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1], has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1],
self._dump_analysis_dir_path) self._folder_path)
return has_issues return has_issues
def analyze_message_logs(self): def _read_logs_for_node(self, node_index, victoria_config_func) -> List[pd.DataFrame]:
waku_tracer = WakuTracer()
waku_tracer.with_received_pattern()
waku_tracer.with_sent_pattern()
config = victoria_config_func(node_index)
reader = VictoriaReader(config, waku_tracer)
data = reader.read()
logger.debug(f'Nodes-{node_index} analyzed')
return data
def _read_logs_concurrently(self, n_nodes: int) -> List[pd.DataFrame]:
dfs = []
with ProcessPoolExecutor() as executor:
futures = {executor.submit(self._read_logs_for_node, i, self._get_victoria_config_parallel): i
for i in range(n_nodes)}
for i, future in enumerate(as_completed(futures)):
i = i + 1
try:
df = future.result()
dfs.append(df)
if i % 50 == 0:
logger.info(f'Processed {i}/{n_nodes} nodes')
except Exception as e:
logger.error(f'Error retrieving logs for node {futures[future]}: {e}')
return dfs
def _has_issues_in_cluster_parallel(self, n_nodes: int) -> bool:
dfs = self._read_logs_concurrently(n_nodes)
dfs = self._merge_dfs(dfs)
result = self._dump_dfs(dfs)
if result.is_err():
logger.warning(f'Issue dumping message summary. {result.err_value}')
exit(1)
waku_tracer = WakuTracer()
waku_tracer.with_received_pattern()
waku_tracer.with_sent_pattern()
has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1],
self._folder_path)
return has_issues
def _merge_dfs(self, dfs: List[pd.DataFrame]) -> List[pd.DataFrame]:
logger.info("Merging and sorting information")
dfs = list(zip(*dfs))
dfs = [pd.concat(tup, axis=0) for tup in dfs]
dfs[0].sort_index(inplace=True)
dfs[1].sort_index(inplace=True)
return dfs
def _dump_dfs(self, dfs: List[pd.DataFrame]) -> Result:
received = dfs[0].reset_index()
received = received.astype(str)
logger.info("Dumping received information")
result = file_utils.dump_df_as_csv(received, self._folder_path / 'summary' / 'received.csv', False)
if result.is_err():
logger.warning(result.err_value)
return Err(result.err_value)
sent = dfs[1].reset_index()
sent = sent.astype(str)
logger.info("Dumping sent information")
result = file_utils.dump_df_as_csv(sent, self._folder_path / 'summary' / 'sent.csv', False)
if result.is_err():
logger.warning(result.err_value)
return Err(result.err_value)
return Ok(None)
def _get_number_nodes(self) -> int:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_container_name:waku AND _time:{self._timestamp} AND kubernetes_pod_name:nodes | uniq by (kubernetes_pod_name)"}
}
reader = VictoriaReader(victoria_config, None)
result = reader.multi_query_info()
if result.is_ok():
return len(list(result.ok_value))
else:
logger.error(result.err_value)
exit(1)
def analyze_message_logs(self, parallel=False):
if self._timestamp is not None: if self._timestamp is not None:
has_issues = self._has_issues_in_cluster() logger.info('Analyzing from server')
n_nodes = self._get_number_nodes()
logger.info(f'Detected {n_nodes} pods')
has_issues = self._has_issues_in_cluster_parallel(
n_nodes) if parallel else self._has_issues_in_cluster_single()
if has_issues: if has_issues:
match file_utils.get_files_from_folder_path(Path(self._dump_analysis_dir_path)): match file_utils.get_files_from_folder_path(Path(self._folder_path), extension="csv"):
case Ok(data_files_names): case Ok(data_files_names):
self._dump_information(data_files_names) self._dump_information(data_files_names)
case Err(error): case Err(error):
logger.error(error) logger.error(error)
else: else:
logger.info('Analyzing from local')
_ = self._has_issues_in_local() _ = self._has_issues_in_local()
def analyze_message_timestamps(self, time_difference_threshold: int): def analyze_message_timestamps(self, time_difference_threshold: int):
@ -122,8 +243,7 @@ class WakuMessageLogAnalyzer:
Note that this function assumes that analyze_message_logs has been called, since timestamps will be checked Note that this function assumes that analyze_message_logs has been called, since timestamps will be checked
from logs. from logs.
""" """
folder_path = self._local_folder_to_analyze_path or self._dump_analysis_dir_path file_logs = file_utils.get_files_from_folder_path(self._folder_path, '*.log')
file_logs = file_utils.get_files_from_folder_path(folder_path, '*.log')
if file_logs.is_err(): if file_logs.is_err():
logger.error(file_logs.err_value) logger.error(file_logs.err_value)
return return
@ -131,7 +251,7 @@ class WakuMessageLogAnalyzer:
logger.info(f'Analyzing timestamps from {len(file_logs.ok_value)} files') logger.info(f'Analyzing timestamps from {len(file_logs.ok_value)} files')
for file in file_logs.ok_value: for file in file_logs.ok_value:
logger.debug(f'Analyzing timestamps for {file}') logger.debug(f'Analyzing timestamps for {file}')
time_jumps = log_utils.find_time_jumps(folder_path / file, time_difference_threshold) time_jumps = log_utils.find_time_jumps(self._folder_path / file, time_difference_threshold)
for jump in time_jumps: for jump in time_jumps:
logger.info(f'{jump[0]} to {jump[1]} -> {jump[2]}') logger.info(f'{file}: {jump[0]} to {jump[1]} -> {jump[2]}')

View File

@ -8,7 +8,7 @@ from kubernetes.client import V1PodList, V1Service
from kubernetes.stream import portforward from kubernetes.stream import portforward
# Project Imports # Project Imports
from src.utils import path from src.utils import path_utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -45,7 +45,7 @@ class KubernetesManager:
logs = api.read_namespaced_pod_log(pod_name, namespace=namespace) logs = api.read_namespaced_pod_log(pod_name, namespace=namespace)
path_location_result = path.prepare_path(location + pod_name + ".log") path_location_result = path_utils.prepare_path(location + pod_name + ".log")
if path_location_result.is_ok(): if path_location_result.is_ok():
with open(f"{path_location_result.ok_value}", "w") as log_file: with open(f"{path_location_result.ok_value}", "w") as log_file:

View File

@ -1,9 +1,11 @@
# Python Imports # Python Imports
import pandas as pd
import yaml import yaml
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List, Dict from typing import List, Dict
from result import Result, Err, Ok from result import Result, Err, Ok
from src.utils import path_utils
# Project Imports # Project Imports
@ -20,14 +22,25 @@ def read_yaml_file(file_path: str) -> Dict:
return data return data
def get_files_from_folder_path(path: Path, extension: str = '*') -> Result[List, str]: def get_files_from_folder_path(path: Path, extension: str = '*') -> Result[List[str], str]:
if not path.exists(): if not path.exists():
return Err(f"{path} does not exist.") return Err(f"{path} does not exist.")
if extension[0] != '.': if not extension.startswith('*'):
extension = '.' + extension extension = '*.' + extension
files = [p.name for p in path.glob(extension) if p.is_file()] files = [p.name for p in path.glob(extension) if p.is_file()]
logger.debug(f"Found {len(files)} files in {path}") logger.debug(f"Found {len(files)} files in {path}")
logger.debug(f"Files are: {files}") logger.debug(f"Files are: {files}")
return Ok(files) return Ok(files)
def dump_df_as_csv(df: pd.DataFrame, file_location: Path, with_index: bool = True) -> Result[pd.DataFrame, str]:
result = path_utils.prepare_path(file_location)
if result.is_ok():
df.to_csv(result.ok_value, index=with_index)
logger.info(f'Dumped {file_location}')
return Ok(df)
return Err(f'{file_location} failed to dump.')