Merge pull request #11 from logos-co/chore-improve-error-handling

chore: Improve error handling
This commit is contained in:
Roman Zajic 2025-03-18 15:16:42 +08:00 committed by GitHub
commit 914157d629
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 27 additions and 18 deletions

View File

@ -35,6 +35,7 @@ class NomosCli:
self._docker_manager = DockerManager(self._image_name) self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name self._container_name = container_name
self._container = None self._container = None
self._stop_event = None
cwd = os.getcwd() cwd = os.getcwd()
self._volumes = [cwd + "/" + volume for volume in self._volumes] self._volumes = [cwd + "/" + volume for volume in self._volumes]
@ -53,7 +54,7 @@ class NomosCli:
logger.debug(f"NomosCli command to run {cmd}") logger.debug(f"NomosCli command to run {cmd}")
self._container = self._docker_manager.start_container( self._container, self._stop_event = self._docker_manager.start_container(
self._docker_manager.image, self._docker_manager.image,
port_bindings=self._port_map, port_bindings=self._port_map,
args=None, args=None,
@ -107,11 +108,11 @@ class NomosCli:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self): def stop(self):
self._container = stop(self._container) self._container = stop(self._container, self._stop_event)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self): def kill(self):
self._container = kill(self._container) self._container = kill(self._container, self._stop_event)
def name(self): def name(self):
return self._container_name return self._container_name

View File

@ -30,6 +30,7 @@ class ProxyClient:
self._docker_manager = DockerManager(self._image_name) self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name self._container_name = container_name
self._container = None self._container = None
self._stop_event = None
self._api = None self._api = None
self._invalid_api = None self._invalid_api = None
@ -62,7 +63,7 @@ class ProxyClient:
logger.debug(f"ProxyCLient command to run {cmd}") logger.debug(f"ProxyCLient command to run {cmd}")
self._container = self._docker_manager.start_container( self._container, self._stop_event = self._docker_manager.start_container(
self._docker_manager.image, self._docker_manager.image,
port_bindings=self._port_map, port_bindings=self._port_map,
args=None, args=None,
@ -79,11 +80,11 @@ class ProxyClient:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self): def stop(self):
self._container = stop(self._container) self._container = stop(self._container, self._stop_event)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self): def kill(self):
self._container = kill(self._container) self._container = kill(self._container, self._stop_event)
def name(self): def name(self):
return self._container_name return self._container_name

View File

@ -72,13 +72,14 @@ class DockerManager:
logger.debug(f"Docker container run failed with exception {ex}") logger.debug(f"Docker container run failed with exception {ex}")
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) stop_event = threading.Event()
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path, stop_event))
log_thread.daemon = True log_thread.daemon = True
log_thread.start() log_thread.start()
return container return container, stop_event
def _log_container_output(self, container, log_path): def _log_container_output(self, container, log_path, stop_event):
os.makedirs(os.path.dirname(log_path), exist_ok=True) os.makedirs(os.path.dirname(log_path), exist_ok=True)
retry_count = 0 retry_count = 0
start_time = time.time() start_time = time.time()
@ -102,6 +103,8 @@ class DockerManager:
except (APIError, IOError) as e: except (APIError, IOError) as e:
retry_count += 1 retry_count += 1
if retry_count >= 5: if retry_count >= 5:
if stop_event.is_set():
return
logger.error(f"Max retries reached for container {container.short_id}. Exiting log stream.") logger.error(f"Max retries reached for container {container.short_id}. Exiting log stream.")
return return
time.sleep(0.2) time.sleep(0.2)
@ -170,10 +173,11 @@ class DockerManager:
return None return None
def stop(container): def stop(container, stop_event):
if container: if container:
logger.debug(f"Stopping container with id {container.short_id}") logger.debug(f"Stopping container with id {container.short_id}")
container.stop() container.stop()
stop_event.set()
try: try:
container.remove() container.remove()
except: except:
@ -183,10 +187,11 @@ def stop(container):
return None return None
def kill(container): def kill(container, stop_event):
if container: if container:
logger.debug(f"Killing container with id {container.short_id}") logger.debug(f"Killing container with id {container.short_id}")
container.kill() container.kill()
stop_event.set()
try: try:
container.remove() container.remove()
except: except:

View File

@ -35,6 +35,7 @@ class NomosNode:
self._docker_manager = DockerManager(self._image_name) self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name self._container_name = container_name
self._container = None self._container = None
self._stop_event = None
cwd = os.getcwd() cwd = os.getcwd()
self._volumes = [cwd + "/" + volume for volume in self._volumes] self._volumes = [cwd + "/" + volume for volume in self._volumes]
@ -67,7 +68,7 @@ class NomosNode:
logger.debug(f"Port map {self._port_map}") logger.debug(f"Port map {self._port_map}")
self._container = self._docker_manager.start_container( self._container, self._stop_event = self._docker_manager.start_container(
self._docker_manager.image, self._docker_manager.image,
port_bindings=self._port_map, port_bindings=self._port_map,
args=default_args, args=default_args,
@ -83,11 +84,11 @@ class NomosNode:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self): def stop(self):
self._container = stop(self._container) self._container = stop(self._container, self._stop_event)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self): def kill(self):
self._container = kill(self._container) self._container = kill(self._container, self._stop_event)
def restart(self): def restart(self):
if self._container: if self._container:

View File

@ -59,15 +59,16 @@ class StepsDataAvailability(StepsCommon):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
def disperse(my_self=self): def disperse(my_self=self):
response = []
try: try:
if client_node is None: if client_node is None:
executor = my_self.find_executor_node() executor = my_self.find_executor_node()
response = executor.send_dispersal_request(request) response = executor.send_dispersal_request(request)
else: else:
response = client_node.send_dispersal_request(request, send_invalid=send_invalid) response = client_node.send_dispersal_request(request, send_invalid=send_invalid)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) logger.error(f"Exception while dispersing data: {ex}")
raise
assert hasattr(response, "status_code"), "Missing status_code" assert hasattr(response, "status_code"), "Missing status_code"
@ -86,14 +87,14 @@ class StepsDataAvailability(StepsCommon):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True) @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(interval), reraise=True)
def get_range(): def get_range():
response = []
try: try:
if client_node is None: if client_node is None:
response = node.send_get_data_range_request(query) response = node.send_get_data_range_request(query)
else: else:
response = client_node.send_get_data_range_request(query, send_invalid=send_invalid) response = client_node.send_get_data_range_request(query, send_invalid=send_invalid)
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) logger.error(f"Exception while retrieving data: {ex}")
raise
assert response_contains_data(response), "Get data range response is empty" assert response_contains_data(response), "Get data range response is empty"