mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 06:53:06 +00:00
Fixes Codex crashes on interrupted downloads (#1151)
* fix: fixes Codex crashes on interrupted downloads * fix: add better feedback to 404, minor rewording in test comment
This commit is contained in:
parent
eb09e610d5
commit
2a3a29720f
@ -65,9 +65,15 @@ proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||
|
||||
return %RestContentList.init(content)
|
||||
|
||||
proc isPending(resp: HttpResponseRef): bool =
|
||||
## Checks that an HttpResponseRef object is still pending; i.e.,
|
||||
## that no body has yet been sent. This helps us guard against calling
|
||||
## sendBody(resp: HttpResponseRef, ...) twice, which is illegal.
|
||||
return resp.getResponseState() == HttpResponseState.Empty
|
||||
|
||||
proc retrieveCid(
|
||||
node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef
|
||||
): Future[RestApiResponse] {.async.} =
|
||||
): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} =
|
||||
## Download a file from the node in a streaming
|
||||
## manner
|
||||
##
|
||||
@ -79,16 +85,21 @@ proc retrieveCid(
|
||||
without stream =? (await node.retrieve(cid, local)), error:
|
||||
if error of BlockNotFoundError:
|
||||
resp.status = Http404
|
||||
return await resp.sendBody("")
|
||||
await resp.sendBody(
|
||||
"The requested CID could not be retrieved (" & error.msg & ")."
|
||||
)
|
||||
return
|
||||
else:
|
||||
resp.status = Http500
|
||||
return await resp.sendBody(error.msg)
|
||||
await resp.sendBody(error.msg)
|
||||
return
|
||||
|
||||
# It is ok to fetch again the manifest because it will hit the cache
|
||||
without manifest =? (await node.fetchManifest(cid)), err:
|
||||
error "Failed to fetch manifest", err = err.msg
|
||||
resp.status = Http404
|
||||
return await resp.sendBody(err.msg)
|
||||
await resp.sendBody(err.msg)
|
||||
return
|
||||
|
||||
if manifest.mimetype.isSome:
|
||||
resp.setHeader("Content-Type", manifest.mimetype.get())
|
||||
@ -119,10 +130,13 @@ proc retrieveCid(
|
||||
await resp.sendChunk(addr buff[0], buff.len)
|
||||
await resp.finish()
|
||||
codex_api_downloads.inc()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "Error streaming blocks", exc = exc.msg
|
||||
resp.status = Http500
|
||||
return await resp.sendBody("")
|
||||
if resp.isPending():
|
||||
await resp.sendBody(exc.msg)
|
||||
finally:
|
||||
info "Sent bytes", cid = cid, bytes
|
||||
if not stream.isNil:
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import std/httpclient
|
||||
import std/importutils
|
||||
import std/net
|
||||
import std/sequtils
|
||||
import std/strformat
|
||||
from pkg/libp2p import `==`, `$`, Cid
|
||||
@ -305,3 +307,31 @@ twonodessuite "REST API":
|
||||
let cid = Manifest.example().makeManifestBlock().get.cid
|
||||
let response = client1.deleteRaw($cid)
|
||||
check response.status == "204 No Content"
|
||||
|
||||
test "should not crash if the download stream is closed before download completes",
|
||||
twoNodesConfig:
|
||||
privateAccess(client1.type)
|
||||
privateAccess(client1.http.type)
|
||||
|
||||
let cid = client1.upload(repeat("some file contents", 1000)).get
|
||||
|
||||
try:
|
||||
# Sadly, there's no high level API for preventing the client from
|
||||
# consuming the whole response, and we need to close the socket
|
||||
# before that happens if we want to trigger the bug, so we need to
|
||||
# resort to this.
|
||||
client1.http.getBody = false
|
||||
let response = client1.downloadRaw($cid)
|
||||
|
||||
# Read 4 bytes from the stream just to make sure we actually
|
||||
# receive some data.
|
||||
let data = client1.http.socket.recv(4)
|
||||
check data.len == 4
|
||||
|
||||
# Prematurely closes the connection.
|
||||
client1.http.close()
|
||||
finally:
|
||||
client1.http.getBody = true
|
||||
|
||||
let response = client1.downloadRaw($cid)
|
||||
check response.body == repeat("some file contents", 1000)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user