Handles LPStreamError in chunker (#947)

* Handles LPStreamError in chunker

* Adds test for lpstream exception

* Adds tests for other stream exceptions. Cleanup.
This commit is contained in:
Ben Bierens 2024-10-10 13:22:36 +02:00 committed by Slava
parent 859b7ea0e5
commit b0607d3fdb
No known key found for this signature in database
GPG Key ID: 351E7AA9BD0DFEB8
3 changed files with 44 additions and 4 deletions

View File

@ -92,8 +92,11 @@ proc new*(
trace "LPStreamChunker stream Eof", exc = exc.msg trace "LPStreamChunker stream Eof", exc = exc.msg
except CancelledError as error: except CancelledError as error:
raise error raise error
except LPStreamError as error:
error "LPStream error", err = error.msg
raise error
except CatchableError as exc: except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg error "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg) raise newException(Defect, exc.msg)
return res return res
@ -127,7 +130,7 @@ proc new*(
except CancelledError as error: except CancelledError as error:
raise error raise error
except CatchableError as exc: except CatchableError as exc:
trace "CatchableError exception", exc = exc.msg error "CatchableError exception", exc = exc.msg
raise newException(Defect, exc.msg) raise newException(Defect, exc.msg)
return total return total

View File

@ -161,7 +161,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
try: try:
without cid =? ( without cid =? (
await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error: await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error:
trace "Error uploading file", exc = error.msg error "Error uploading file", exc = error.msg
return RestApiResponse.error(Http500, error.msg) return RestApiResponse.error(Http500, error.msg)
codex_api_uploads.inc() codex_api_uploads.inc()

View File

@ -1,4 +1,3 @@
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/logutils import pkg/codex/logutils
@ -7,6 +6,17 @@ import pkg/chronos
import ../asynctest import ../asynctest
import ./helpers import ./helpers
type
CrashingStreamWrapper* = ref object of LPStream
toRaise*: ref CatchableError
method readOnce*(
self: CrashingStreamWrapper,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
raise self.toRaise
asyncchecksuite "Chunking": asyncchecksuite "Chunking":
test "should return proper size chunks": test "should return proper size chunks":
var offset = 0 var offset = 0
@ -78,3 +88,30 @@ asyncchecksuite "Chunking":
string.fromBytes(data) == readFile(path) string.fromBytes(data) == readFile(path)
fileChunker.offset == data.len fileChunker.offset == data.len
proc raiseStreamException(exc: ref CatchableError) {.async.} =
let stream = CrashingStreamWrapper.new()
let chunker = LPStreamChunker.new(
stream = stream,
chunkSize = 2'nb)
stream.toRaise = exc
discard (await chunker.getBytes())
test "stream should forward LPStreamError":
expect LPStreamError:
await raiseStreamException(newException(LPStreamError, "test error"))
test "stream should catch LPStreamEOFError":
await raiseStreamException(newException(LPStreamEOFError, "test error"))
test "stream should forward CancelledError":
expect CancelledError:
await raiseStreamException(newException(CancelledError, "test error"))
test "stream should forward LPStreamError":
expect LPStreamError:
await raiseStreamException(newException(LPStreamError, "test error"))
test "stream should convert other exceptions to defect":
expect Defect:
await raiseStreamException(newException(CatchableError, "test error"))