diff --git a/src/data/data_handler.py b/src/data/data_handler.py index 2c97da2..832bef7 100644 --- a/src/data/data_handler.py +++ b/src/data/data_handler.py @@ -4,7 +4,7 @@ import pandas as pd from typing import List, Optional # Project Imports -from src.utils import path +from src.utils import path_utils logger = logging.getLogger(__name__) @@ -15,7 +15,7 @@ class DataHandler: self._dataframe = pd.DataFrame() def dump_dataframe(self, dump_path: str): - result = path.prepare_path(dump_path) + result = path_utils.prepare_path(dump_path) if result.is_err(): logger.error(f'{result.err_value}') exit(1) diff --git a/src/mesh_analysis/readers/victoria_reader.py b/src/mesh_analysis/readers/victoria_reader.py index 26ad23f..d155552 100644 --- a/src/mesh_analysis/readers/victoria_reader.py +++ b/src/mesh_analysis/readers/victoria_reader.py @@ -3,8 +3,9 @@ import json import logging import re import time +import pandas as pd import requests -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Iterator from httpx import Response from result import Result, Ok, Err @@ -22,8 +23,8 @@ class VictoriaReader: self.logs = [] def _fetch_data(self, headers: Dict, params: Dict): - logger.info(f'Fetching {params}') - time.sleep(5) + logger.debug(f'Fetching {params}') + # time.sleep(5) with requests.post(self._config['url'], headers=headers, params=params, stream=True) as response: for line in response.iter_lines(): if line: @@ -33,8 +34,7 @@ class VictoriaReader: logger.info(line) exit() self.logs.append((parsed_object['_msg'], parsed_object['kubernetes_pod_name'])) - logger.debug("line added") - logger.info(f'Fetched {len(self.logs)} messages') + logger.debug(f'Fetched {len(self.logs)} messages') def _make_queries(self) -> List: results = [[] for _ in self._tracer.patterns] @@ -47,13 +47,13 @@ class VictoriaReader: match_as_list = list(match.groups()) match_as_list.append(log_line[1]) results[i].append(match_as_list) - logger.info('Fetched lines parsed with pattern') + # logger.debug('Fetched lines parsed with pattern') self.logs.clear() return results - def read(self) -> List: - logger.info(f'Reading {self._config["url"]}') + def read(self) -> List[pd.DataFrame]: + # logger.info(f'Reading {self._config["url"]}') results = self._make_queries() dfs = self._tracer.trace(results) @@ -74,3 +74,20 @@ class VictoriaReader: logger.error(f'Failed to decode JSON: {e}') logger.error(f'Response content: {response.content}') + return Err(response) + + def multi_query_info(self) -> Result[Iterator, str]: + time.sleep(10) + response = requests.post(self._config['url'], headers=self._config['headers'], params=self._config['params']) + if response.status_code != 200: + logger.error(f'Request failed with status code: {response.status_code}') + return Err(response.text) + + try: + data = response.iter_lines() + return Ok(data) + except json.decoder.JSONDecodeError as e: + logger.error(f'Failed to decode JSON: {e}') + logger.error(f'Response content: {response.content}') + + return Err(response.text) diff --git a/src/mesh_analysis/tracers/waku_tracer.py b/src/mesh_analysis/tracers/waku_tracer.py index 755a8cc..735be59 100644 --- a/src/mesh_analysis/tracers/waku_tracer.py +++ b/src/mesh_analysis/tracers/waku_tracer.py @@ -8,7 +8,7 @@ from result import Ok, Err # Project Imports from src.mesh_analysis.tracers.message_tracer import MessageTracer -from src.utils import file_utils, path +from src.utils import path_utils logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class WakuTracer(MessageTracer): self._patterns.append(r'(.*)') self._tracings.append(self._trace_all_logs) - def trace(self, parsed_logs: List) -> List: + def trace(self, parsed_logs: List) -> List[pd.DataFrame]: dfs = [trace(parsed_logs[i]) for i, trace in enumerate(self._tracings) if trace is not None] return dfs @@ -112,8 +112,8 @@ class WakuTracer(MessageTracer): msg_sent_data = self.check_if_msg_has_been_sent(peers_missed_messages, missed_messages, sent_df) for data in msg_sent_data: peer_id = data[0].split('*')[-1] - logger.warning(f'Peer {peer_id} message information dumped in {issue_dump_location}') - match path.prepare_path(issue_dump_location / f"{data[0].split('*')[-1]}.csv"): + logger.info(f'Peer {peer_id} message information dumped in {issue_dump_location}') + match path_utils.prepare_path(issue_dump_location / f"{data[0].split('*')[-1]}.csv"): case Ok(location_path): data[1].to_csv(location_path) case Err(err): diff --git a/src/mesh_analysis/waku_message_log_analyzer.py b/src/mesh_analysis/waku_message_log_analyzer.py index 00957e2..013bd38 100644 --- a/src/mesh_analysis/waku_message_log_analyzer.py +++ b/src/mesh_analysis/waku_message_log_analyzer.py @@ -1,7 +1,9 @@ # Python Imports import logging +import pandas as pd +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path -from typing import List +from typing import List, Dict from result import Ok, Err, Result # Project Imports @@ -19,7 +21,7 @@ class WakuMessageLogAnalyzer: self._validate_analysis_location(timestamp_to_analyze, local_folder_to_analyze) self._set_up_paths(dump_analysis_dir, local_folder_to_analyze) self._timestamp = timestamp_to_analyze - self._set_victoria_config() + # self._set_victoria_config() def _validate_analysis_location(self, timestamp_to_analyze: str, local_folder_to_analyze: str): if timestamp_to_analyze is None and local_folder_to_analyze is None: @@ -27,18 +29,27 @@ class WakuMessageLogAnalyzer: exit(1) def _set_up_paths(self, dump_analysis_dir: str, local_folder_to_analyze: str): - self._dump_analysis_dir_path = Path(dump_analysis_dir) if dump_analysis_dir else None - self._local_folder_to_analyze_path = Path(local_folder_to_analyze) if local_folder_to_analyze else None + self._folder_path = Path(dump_analysis_dir) if dump_analysis_dir else Path(local_folder_to_analyze) - def _set_victoria_config(self): - self._victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query", - "headers": {"Content-Type": "application/json"}, - "params": [ - { - "query": f"kubernetes_container_name:waku AND received relay message AND _time:{self._timestamp} | sort by (_time)"}, - { - "query": f"kubernetes_container_name:waku AND sent relay message AND _time:{self._timestamp} | sort by (_time)"}] - } + def _get_victoria_config_parallel(self, pod_name: str) -> Dict: + return {"url": "https://vmselect.riff.cc/select/logsql/query", + "headers": {"Content-Type": "application/json"}, + "params": [ + { + "query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{pod_name} AND received relay message AND _time:{self._timestamp}"}, + { + "query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{pod_name} AND sent relay message AND _time:{self._timestamp}"}] + } + + def _get_victoria_config_single(self) -> Dict: + return {"url": "https://vmselect.riff.cc/select/logsql/query", + "headers": {"Content-Type": "application/json"}, + "params": [ + { + "query": f"kubernetes_container_name:waku AND received relay message AND _time:{self._timestamp}"}, + { + "query": f"kubernetes_container_name:waku AND sent relay message AND _time:{self._timestamp}"}] + } def _get_affected_node_pod(self, data_file: str) -> Result[str, str]: peer_id = data_file.split('.')[0] @@ -51,15 +62,16 @@ class WakuMessageLogAnalyzer: result = reader.single_query_info() if result.is_ok(): - return Ok(result.unwrap()['kubernetes_pod_name']) + pod_name = result.unwrap()['kubernetes_pod_name'] + logger.debug(f'Pod name for peer id {peer_id} is {pod_name}') + return Ok(pod_name) return Err(f'Unable to obtain pod name from {peer_id}') - def _get_affected_node_log(self, data_file: str): + def _get_affected_node_log(self, data_file: str) -> Result[Path, str]: result = self._get_affected_node_pod(data_file) if result.is_err(): - logger.warning(result.err_value) - return + return Err(result.err_value) victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query", "headers": {"Content-Type": "application/json"}, @@ -72,49 +84,158 @@ class WakuMessageLogAnalyzer: pod_log = reader.read() log_lines = [inner_list[0] for inner_list in pod_log[0]] - with open(self._dump_analysis_dir_path / f'{pod_log[0][0][1]}.log', 'w') as file: + log_name_path = self._folder_path / f"{data_file.split('.')[0]}.log" + with open(log_name_path, 'w') as file: for element in log_lines: file.write(f"{element}\n") + return Ok(log_name_path) + def _dump_information(self, data_files: List[str]): - for data_file in data_files: - logger.info(f'Dumping information for {data_file}') - self._get_affected_node_log(data_file) + with ProcessPoolExecutor() as executor: + futures = {executor.submit(self._get_affected_node_log, data_file): data_file for data_file in data_files} + + for future in as_completed(futures): + try: + result = future.result() + match result: + case Ok(log_path): + logger.info(f'{log_path} dumped') + case Err(_): + logger.warning(result.err_value) + except Exception as e: + logger.error(f'Error retrieving logs for node {futures[future]}: {e}') def _has_issues_in_local(self) -> bool: waku_tracer = WakuTracer() waku_tracer.with_received_pattern() waku_tracer.with_sent_pattern() - reader = FileReader(self._local_folder_to_analyze_path, waku_tracer) + reader = FileReader(self._folder_path, waku_tracer) dfs = reader.read() has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1], - self._dump_analysis_dir_path) + self._folder_path) return has_issues - def _has_issues_in_cluster(self) -> bool: + def _has_issues_in_cluster_single(self) -> bool: waku_tracer = WakuTracer() waku_tracer.with_received_pattern() waku_tracer.with_sent_pattern() - reader = VictoriaReader(self._victoria_config, waku_tracer) + reader = VictoriaReader(self._get_victoria_config_single(), waku_tracer) dfs = reader.read() has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1], - self._dump_analysis_dir_path) + self._folder_path) return has_issues - def analyze_message_logs(self): + def _read_logs_for_node(self, node_index, victoria_config_func) -> List[pd.DataFrame]: + waku_tracer = WakuTracer() + waku_tracer.with_received_pattern() + waku_tracer.with_sent_pattern() + + config = victoria_config_func(node_index) + reader = VictoriaReader(config, waku_tracer) + data = reader.read() + logger.debug(f'Nodes-{node_index} analyzed') + + return data + + def _read_logs_concurrently(self, n_nodes: int) -> List[pd.DataFrame]: + dfs = [] + with ProcessPoolExecutor() as executor: + futures = {executor.submit(self._read_logs_for_node, i, self._get_victoria_config_parallel): i + for i in range(n_nodes)} + + for i, future in enumerate(as_completed(futures)): + i = i + 1 + try: + df = future.result() + dfs.append(df) + if i % 50 == 0: + logger.info(f'Processed {i}/{n_nodes} nodes') + + except Exception as e: + logger.error(f'Error retrieving logs for node {futures[future]}: {e}') + + return dfs + + def _has_issues_in_cluster_parallel(self, n_nodes: int) -> bool: + dfs = self._read_logs_concurrently(n_nodes) + dfs = self._merge_dfs(dfs) + + result = self._dump_dfs(dfs) + if result.is_err(): + logger.warning(f'Issue dumping message summary. {result.err_value}') + exit(1) + + waku_tracer = WakuTracer() + waku_tracer.with_received_pattern() + waku_tracer.with_sent_pattern() + has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1], + self._folder_path) + + return has_issues + + def _merge_dfs(self, dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: + logger.info("Merging and sorting information") + dfs = list(zip(*dfs)) + dfs = [pd.concat(tup, axis=0) for tup in dfs] + dfs[0].sort_index(inplace=True) + dfs[1].sort_index(inplace=True) + + return dfs + + def _dump_dfs(self, dfs: List[pd.DataFrame]) -> Result: + received = dfs[0].reset_index() + received = received.astype(str) + logger.info("Dumping received information") + result = file_utils.dump_df_as_csv(received, self._folder_path / 'summary' / 'received.csv', False) + if result.is_err(): + logger.warning(result.err_value) + return Err(result.err_value) + + sent = dfs[1].reset_index() + sent = sent.astype(str) + logger.info("Dumping sent information") + result = file_utils.dump_df_as_csv(sent, self._folder_path / 'summary' / 'sent.csv', False) + if result.is_err(): + logger.warning(result.err_value) + return Err(result.err_value) + + return Ok(None) + + def _get_number_nodes(self) -> int: + victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query", + "headers": {"Content-Type": "application/json"}, + "params": { + "query": f"kubernetes_container_name:waku AND _time:{self._timestamp} AND kubernetes_pod_name:nodes | uniq by (kubernetes_pod_name)"} + } + + reader = VictoriaReader(victoria_config, None) + result = reader.multi_query_info() + if result.is_ok(): + return len(list(result.ok_value)) + else: + logger.error(result.err_value) + exit(1) + + def analyze_message_logs(self, parallel=False): if self._timestamp is not None: - has_issues = self._has_issues_in_cluster() + logger.info('Analyzing from server') + n_nodes = self._get_number_nodes() + logger.info(f'Detected {n_nodes} pods') + has_issues = self._has_issues_in_cluster_parallel( + n_nodes) if parallel else self._has_issues_in_cluster_single() if has_issues: - match file_utils.get_files_from_folder_path(Path(self._dump_analysis_dir_path)): + match file_utils.get_files_from_folder_path(Path(self._folder_path), extension="csv"): case Ok(data_files_names): self._dump_information(data_files_names) case Err(error): logger.error(error) else: + logger.info('Analyzing from local') _ = self._has_issues_in_local() def analyze_message_timestamps(self, time_difference_threshold: int): @@ -122,8 +243,7 @@ class WakuMessageLogAnalyzer: Note that this function assumes that analyze_message_logs has been called, since timestamps will be checked from logs. """ - folder_path = self._local_folder_to_analyze_path or self._dump_analysis_dir_path - file_logs = file_utils.get_files_from_folder_path(folder_path, '*.log') + file_logs = file_utils.get_files_from_folder_path(self._folder_path, '*.log') if file_logs.is_err(): logger.error(file_logs.err_value) return @@ -131,7 +251,7 @@ class WakuMessageLogAnalyzer: logger.info(f'Analyzing timestamps from {len(file_logs.ok_value)} files') for file in file_logs.ok_value: logger.debug(f'Analyzing timestamps for {file}') - time_jumps = log_utils.find_time_jumps(folder_path / file, time_difference_threshold) + time_jumps = log_utils.find_time_jumps(self._folder_path / file, time_difference_threshold) for jump in time_jumps: - logger.info(f'{jump[0]} to {jump[1]} -> {jump[2]}') + logger.info(f'{file}: {jump[0]} to {jump[1]} -> {jump[2]}') diff --git a/src/metrics/kubernetes.py b/src/metrics/kubernetes.py index 2011b95..12fb926 100644 --- a/src/metrics/kubernetes.py +++ b/src/metrics/kubernetes.py @@ -8,7 +8,7 @@ from kubernetes.client import V1PodList, V1Service from kubernetes.stream import portforward # Project Imports -from src.utils import path +from src.utils import path_utils logger = logging.getLogger(__name__) @@ -45,7 +45,7 @@ class KubernetesManager: logs = api.read_namespaced_pod_log(pod_name, namespace=namespace) - path_location_result = path.prepare_path(location + pod_name + ".log") + path_location_result = path_utils.prepare_path(location + pod_name + ".log") if path_location_result.is_ok(): with open(f"{path_location_result.ok_value}", "w") as log_file: diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py index 1b74c0c..ee96cf6 100644 --- a/src/utils/file_utils.py +++ b/src/utils/file_utils.py @@ -1,9 +1,11 @@ # Python Imports +import pandas as pd import yaml import logging from pathlib import Path from typing import List, Dict from result import Result, Err, Ok +from src.utils import path_utils # Project Imports @@ -20,14 +22,25 @@ def read_yaml_file(file_path: str) -> Dict: return data -def get_files_from_folder_path(path: Path, extension: str = '*') -> Result[List, str]: +def get_files_from_folder_path(path: Path, extension: str = '*') -> Result[List[str], str]: if not path.exists(): return Err(f"{path} does not exist.") - if extension[0] != '.': - extension = '.' + extension + if not extension.startswith('*'): + extension = '*.' + extension + files = [p.name for p in path.glob(extension) if p.is_file()] logger.debug(f"Found {len(files)} files in {path}") logger.debug(f"Files are: {files}") return Ok(files) + + +def dump_df_as_csv(df: pd.DataFrame, file_location: Path, with_index: bool = True) -> Result[pd.DataFrame, str]: + result = path_utils.prepare_path(file_location) + if result.is_ok(): + df.to_csv(result.ok_value, index=with_index) + logger.info(f'Dumped {file_location}') + return Ok(df) + + return Err(f'{file_location} failed to dump.') diff --git a/src/utils/path.py b/src/utils/path_utils.py similarity index 100% rename from src/utils/path.py rename to src/utils/path_utils.py