diff --git a/src/client/nomos_cli.py b/src/client/nomos_cli.py index f01da6c..cf5bf94 100644 --- a/src/client/nomos_cli.py +++ b/src/client/nomos_cli.py @@ -35,6 +35,7 @@ class NomosCli: self._docker_manager = DockerManager(self._image_name) self._container_name = container_name self._container = None + self._stop_event = None cwd = os.getcwd() self._volumes = [cwd + "/" + volume for volume in self._volumes] @@ -53,7 +54,7 @@ class NomosCli: logger.debug(f"NomosCli command to run {cmd}") - self._container = self._docker_manager.start_container( + self._container, self._stop_event = self._docker_manager.start_container( self._docker_manager.image, port_bindings=self._port_map, args=None, @@ -107,11 +108,11 @@ class NomosCli: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - self._container = stop(self._container) + self._container = stop(self._container, self._stop_event) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): - self._container = kill(self._container) + self._container = kill(self._container, self._stop_event) def name(self): return self._container_name diff --git a/src/client/proxy_client.py b/src/client/proxy_client.py index 86dc908..51aab06 100644 --- a/src/client/proxy_client.py +++ b/src/client/proxy_client.py @@ -30,6 +30,7 @@ class ProxyClient: self._docker_manager = DockerManager(self._image_name) self._container_name = container_name self._container = None + self._stop_event = None self._api = None self._invalid_api = None @@ -62,7 +63,7 @@ class ProxyClient: logger.debug(f"ProxyCLient command to run {cmd}") - self._container = self._docker_manager.start_container( + self._container, self._stop_event = self._docker_manager.start_container( self._docker_manager.image, port_bindings=self._port_map, args=None, @@ -79,11 +80,11 @@ class ProxyClient: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - self._container = stop(self._container) + self._container = stop(self._container, self._stop_event) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): - self._container = kill(self._container) + self._container = kill(self._container, self._stop_event) def name(self): return self._container_name diff --git a/src/docker_manager.py b/src/docker_manager.py index 46d1905..2e0dadc 100644 --- a/src/docker_manager.py +++ b/src/docker_manager.py @@ -72,13 +72,14 @@ class DockerManager: logger.debug(f"Docker container run failed with exception {ex}") logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") - log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) + stop_event = threading.Event() + log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path, stop_event)) log_thread.daemon = True log_thread.start() - return container + return container, stop_event - def _log_container_output(self, container, log_path): + def _log_container_output(self, container, log_path, stop_event): os.makedirs(os.path.dirname(log_path), exist_ok=True) retry_count = 0 start_time = time.time() @@ -102,6 +103,8 @@ class DockerManager: except (APIError, IOError) as e: retry_count += 1 if retry_count >= 5: + if stop_event.is_set(): + return logger.error(f"Max retries reached for container {container.short_id}. Exiting log stream.") return time.sleep(0.2) @@ -170,10 +173,11 @@ class DockerManager: return None -def stop(container): +def stop(container, stop_event): if container: logger.debug(f"Stopping container with id {container.short_id}") container.stop() + stop_event.set() try: container.remove() except: @@ -183,10 +187,11 @@ def stop(container): return None -def kill(container): +def kill(container, stop_event): if container: logger.debug(f"Killing container with id {container.short_id}") container.kill() + stop_event.set() try: container.remove() except: diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py index 0d372b7..5bc9b88 100644 --- a/src/node/nomos_node.py +++ b/src/node/nomos_node.py @@ -35,6 +35,7 @@ class NomosNode: self._docker_manager = DockerManager(self._image_name) self._container_name = container_name self._container = None + self._stop_event = None cwd = os.getcwd() self._volumes = [cwd + "/" + volume for volume in self._volumes] @@ -67,7 +68,7 @@ class NomosNode: logger.debug(f"Port map {self._port_map}") - self._container = self._docker_manager.start_container( + self._container, self._stop_event = self._docker_manager.start_container( self._docker_manager.image, port_bindings=self._port_map, args=default_args, @@ -83,11 +84,11 @@ class NomosNode: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - self._container = stop(self._container) + self._container = stop(self._container, self._stop_event) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): - self._container = kill(self._container) + self._container = kill(self._container, self._stop_event) def restart(self): if self._container: diff --git a/src/steps/da.py b/src/steps/da.py index 8fd60e5..95509b6 100644 --- a/src/steps/da.py +++ b/src/steps/da.py @@ -59,15 +59,16 @@ class StepsDataAvailability(StepsCommon): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True) def disperse(my_self=self): - response = [] try: if client_node is None: executor = my_self.find_executor_node() response = executor.send_dispersal_request(request) else: response = client_node.send_dispersal_request(request, send_invalid=send_invalid) + except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + logger.error(f"Exception while dispersing data: {ex}") + raise assert hasattr(response, "status_code"), "Missing status_code" @@ -86,14 +87,14 @@ class StepsDataAvailability(StepsCommon): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) def get_range(): - response = [] try: if client_node is None: response = node.send_get_data_range_request(query) else: response = client_node.send_get_data_range_request(query, send_invalid=send_invalid) except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + logger.error(f"Exception while retrieving data: {ex}") + raise assert response_contains_data(response), "Get data range response is empty"