From 0ce52802f737dbe8cd8ceb5fadcf2e6278190629 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 10 Feb 2025 20:49:24 +0800 Subject: [PATCH] fix: add log stream parsing - try/except for docker.run --- src/cli/nomos_cli.py | 27 +++++++++ src/docker_manager.py | 65 +++++++++++++-------- tests/data_integrity/test_data_integrity.py | 3 +- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/src/cli/nomos_cli.py b/src/cli/nomos_cli.py index 2ba9cc7..b7449a6 100644 --- a/src/cli/nomos_cli.py +++ b/src/cli/nomos_cli.py @@ -1,5 +1,6 @@ import os +from src.data_storage import DS from src.libs.common import generate_log_prefix from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed @@ -55,6 +56,20 @@ class NomosCli: command=cmd, ) + DS.nomos_nodes.append(self) + + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) + def stop(self): + if self._container: + logger.debug(f"Stopping container with id {self._container.short_id}") + self._container.stop() + try: + self._container.remove() + except: + pass + self._container = None + logger.debug("Container stopped.") + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): if self._container: @@ -66,3 +81,15 @@ class NomosCli: pass self._container = None logger.debug("Container killed.") + + def get_reconstruct_result(self): + keywords = ["Reconstructed data"] + + log_stream = self._container.logs(stream=True) + + matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False, log_stream) + # assert not matches, f"Reconstructed data not found {matches}" + for match in matches: + logger.debug(f"match {match}\n\n\n") + + DS.nomos_nodes.remove(self) diff --git a/src/docker_manager.py b/src/docker_manager.py index a27505d..5306a80 100644 --- a/src/docker_manager.py +++ b/src/docker_manager.py @@ -50,18 +50,22 @@ class DockerManager: cli_args_str_for_log = " ".join(cli_args) logger.debug(f"docker run -i -t {port_bindings} {image_name} {cli_args_str_for_log}") - container = self._client.containers.run( - image_name, - command=cli_args, - ports=port_bindings, - detach=True, - remove=remove_container, - auto_remove=remove_container, - volumes=volumes, - entrypoint=entrypoint, - name=name, - network=NETWORK_NAME, - ) + + try: + container = self._client.containers.run( + image_name, + command=cli_args, + ports=port_bindings, + detach=True, + remove=remove_container, + auto_remove=remove_container, + volumes=volumes, + entrypoint=entrypoint, + name=name, + network=NETWORK_NAME, + ) + except Exception as ex: + logger.debug(f"Docker container run failed with exception {ex}") logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) @@ -128,19 +132,32 @@ class DockerManager: def image(self): return self._image - def search_log_for_keywords(self, log_path, keywords, use_regex=False): + def find_keywords_in_line(self, keywords, line, use_regex=False): matches = {keyword: [] for keyword in keywords} - # Open the log file and search line by line - with open(log_path, "r") as log_file: - for line in log_file: - for keyword in keywords: - if use_regex: - if re.search(keyword, line, re.IGNORECASE): - matches[keyword].append(line.strip()) - else: - if keyword.lower() in line.lower(): - matches[keyword].append(line.strip()) + for keyword in keywords: + if use_regex: + if re.search(keyword, line, re.IGNORECASE): + matches[keyword].append(line.strip()) + else: + if keyword.lower() in line.lower(): + matches[keyword].append(line.strip()) + + return matches + + def search_log_for_keywords(self, log_path, keywords, use_regex=False, log_stream=None): + matches = {} + + # Read from stream + if log_stream is not None: + for line in log_stream: + matches = self.find_keywords_in_line(keywords, line.decode("utf-8"), use_regex=use_regex) + + else: + # Open the log file and search line by line + with open(log_path, "r") as log_file: + for line in log_file: + matches = self.find_keywords_in_line(keywords, line, use_regex=use_regex) # Check if there were any matches if any(matches[keyword] for keyword in keywords): @@ -149,5 +166,5 @@ class DockerManager: logger.debug(f"Found matches for keyword '{keyword}': {lines}") return matches else: - logger.debug("No errors found in the nomos logs.") + logger.debug("No keywords found in the nomos logs.") return None diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py index ab03369..3f45ff0 100644 --- a/tests/data_integrity/test_data_integrity.py +++ b/tests/data_integrity/test_data_integrity.py @@ -31,10 +31,11 @@ class TestDataIntegrity(StepsDataAvailability): @pytest.mark.usefixtures("setup_2_node_cluster") def test_da_sampling_determines_data_presence(self): self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8) - delay(10) + delay(5) received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5]) rcv_data_json = json.dumps(received_data) cli = NomosCli(command="reconstruct") cli.run(input_values=[str(rcv_data_json)]) + cli.get_reconstruct_result() # assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8")