fix: add log stream parsing

- try/except for docker.run
This commit is contained in:
Roman 2025-02-10 20:49:24 +08:00
parent d05a078925
commit 0ce52802f7
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75
3 changed files with 70 additions and 25 deletions

View File

@ -1,5 +1,6 @@
import os
from src.data_storage import DS
from src.libs.common import generate_log_prefix
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
@ -55,6 +56,20 @@ class NomosCli:
command=cmd,
)
DS.nomos_nodes.append(self)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container stopped.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
@ -66,3 +81,15 @@ class NomosCli:
pass
self._container = None
logger.debug("Container killed.")
def get_reconstruct_result(self):
keywords = ["Reconstructed data"]
log_stream = self._container.logs(stream=True)
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False, log_stream)
# assert not matches, f"Reconstructed data not found {matches}"
for match in matches:
logger.debug(f"match {match}\n\n\n")
DS.nomos_nodes.remove(self)

View File

@ -50,18 +50,22 @@ class DockerManager:
cli_args_str_for_log = " ".join(cli_args)
logger.debug(f"docker run -i -t {port_bindings} {image_name} {cli_args_str_for_log}")
container = self._client.containers.run(
image_name,
command=cli_args,
ports=port_bindings,
detach=True,
remove=remove_container,
auto_remove=remove_container,
volumes=volumes,
entrypoint=entrypoint,
name=name,
network=NETWORK_NAME,
)
try:
container = self._client.containers.run(
image_name,
command=cli_args,
ports=port_bindings,
detach=True,
remove=remove_container,
auto_remove=remove_container,
volumes=volumes,
entrypoint=entrypoint,
name=name,
network=NETWORK_NAME,
)
except Exception as ex:
logger.debug(f"Docker container run failed with exception {ex}")
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
@ -128,19 +132,32 @@ class DockerManager:
def image(self):
return self._image
def search_log_for_keywords(self, log_path, keywords, use_regex=False):
def find_keywords_in_line(self, keywords, line, use_regex=False):
matches = {keyword: [] for keyword in keywords}
# Open the log file and search line by line
with open(log_path, "r") as log_file:
for line in log_file:
for keyword in keywords:
if use_regex:
if re.search(keyword, line, re.IGNORECASE):
matches[keyword].append(line.strip())
else:
if keyword.lower() in line.lower():
matches[keyword].append(line.strip())
for keyword in keywords:
if use_regex:
if re.search(keyword, line, re.IGNORECASE):
matches[keyword].append(line.strip())
else:
if keyword.lower() in line.lower():
matches[keyword].append(line.strip())
return matches
def search_log_for_keywords(self, log_path, keywords, use_regex=False, log_stream=None):
matches = {}
# Read from stream
if log_stream is not None:
for line in log_stream:
matches = self.find_keywords_in_line(keywords, line.decode("utf-8"), use_regex=use_regex)
else:
# Open the log file and search line by line
with open(log_path, "r") as log_file:
for line in log_file:
matches = self.find_keywords_in_line(keywords, line, use_regex=use_regex)
# Check if there were any matches
if any(matches[keyword] for keyword in keywords):
@ -149,5 +166,5 @@ class DockerManager:
logger.debug(f"Found matches for keyword '{keyword}': {lines}")
return matches
else:
logger.debug("No errors found in the nomos logs.")
logger.debug("No keywords found in the nomos logs.")
return None

View File

@ -31,10 +31,11 @@ class TestDataIntegrity(StepsDataAvailability):
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_sampling_determines_data_presence(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
delay(10)
delay(5)
received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
rcv_data_json = json.dumps(received_data)
cli = NomosCli(command="reconstruct")
cli.run(input_values=[str(rcv_data_json)])
cli.get_reconstruct_result()
# assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8")