diff --git a/codex/tarballs/directorydownloader.nim b/codex/tarballs/directorydownloader.nim index ffc78c40..63e3fb30 100644 --- a/codex/tarballs/directorydownloader.nim +++ b/codex/tarballs/directorydownloader.nim @@ -23,7 +23,6 @@ import pkg/stew/byteutils import ../node import ../logutils import ../utils/iter -import ../utils/safeasynciter import ../utils/trackedfutures import ../errors import ../manifest @@ -89,115 +88,122 @@ proc createEntryHeader( proc fetchTarball( self: DirectoryDownloader, cid: Cid, basePath = "" ): Future[?!void] {.async: (raises: [CancelledError]).} = - echo "fetchTarball: ", cid, " basePath = ", basePath - # we got a Cid - let's check if this is a manifest (can be either - # a directory or file manifest) - without isM =? cid.isManifest, err: - warn "Unable to determine if cid is a manifest" - return failure("Unable to determine if cid is a manifest") + # we only need try/catch here because PR for checked exceptions is + # not yet merged + try: + echo "fetchTarball: ", cid, " basePath = ", basePath + # we got a Cid - let's check if this is a manifest (can be either + # a directory or file manifest) + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is a manifest" + return failure("Unable to determine if cid is a manifest") - if not isM: - # this is not a manifest, so we can return - return failure("given cid is not a manifest: " & $cid) + if not isM: + # this is not a manifest, so we can return + return failure("given cid is not a manifest: " & $cid) - # get the manifest - without blk =? await self.node.blockStore.getBlock(cid), err: - error "Error retrieving manifest block", cid, err = err.msg - return - failure("Error retrieving manifest block (cid = " & $cid & "), err = " & err.msg) + # get the manifest + without blk =? await self.node.blockStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return failure( + "Error retrieving manifest block (cid = " & $cid & "), err = " & err.msg + ) - without manifest =? Manifest.decode(blk), err: - info "Unable to decode as manifest - trying to decode as directory manifest", - err = err.msg - # Try if it not a directory manifest - without manifest =? DirectoryManifest.decode(blk), err: - error "Unable to decode as directory manifest", err = err.msg - return failure("Unable to decode as valid manifest (cid = " & $cid & ")") - # this is a directory manifest - echo "Decoded directory manifest: ", $manifest - let dirEntry = TarballEntry( - kind: ekDirectory, - name: manifest.name, + without manifest =? Manifest.decode(blk), err: + info "Unable to decode as manifest - trying to decode as directory manifest", + err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as valid manifest (cid = " & $cid & ")") + # this is a directory manifest + echo "Decoded directory manifest: ", $manifest + let dirEntry = TarballEntry( + kind: ekDirectory, + name: manifest.name, + lastModified: getTime(), # ToDo: store actual time in the manifest + permissions: parseFilePermissions(cast[uint32](0o755)), # same here + contentLength: 0, + ) + let header = self.createEntryHeader(dirEntry, basePath) + echo "header = ", header + await self.queue.addLast(header.toBytes()) + self.printQueue() + var entryLength = header.len + let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned + if alignedEntryLength - entryLength > 0: + echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" + var data = newSeq[byte]() + data.setLen(alignedEntryLength - entryLength) + await self.queue.addLast(data) + self.printQueue() + + for cid in manifest.cids: + echo "fetching directory content: ", cid + if err =? (await self.fetchTarball(cid, basePath / manifest.name)).errorOption: + error "Error fetching directory content", + cid, path = basePath / manifest.name, err = err.msg + return failure( + "Error fetching directory content (cid = " & $cid & "), err = " & err.msg + ) + echo "fetchTarball[DIR]: ", cid, " basePath = ", basePath, " done" + return success() + + # this is a regular file (Codex) manifest + echo "Decoded file manifest: ", $manifest + let fileEntry = TarballEntry( + kind: ekNormalFile, + name: manifest.filename |? "unknown", lastModified: getTime(), # ToDo: store actual time in the manifest - permissions: parseFilePermissions(cast[uint32](0o755)), # same here - contentLength: 0, + permissions: parseFilePermissions(cast[uint32](0o644)), # same here + contentLength: manifest.datasetSize.int, ) - let header = self.createEntryHeader(dirEntry, basePath) - echo "header = ", header + let header = self.createEntryHeader(fileEntry, basePath) await self.queue.addLast(header.toBytes()) self.printQueue() - var entryLength = header.len + var contentLength = 0 + + proc onBatch(blocks: seq[Block]): Future[?!void] {.async.} = + echo "onBatch: ", blocks.len, " blocks" + for blk in blocks: + echo "onBatch[blk.data]: ", string.fromBytes(blk.data) + # await self.queue.addLast(string.fromBytes(blk.data)) + await self.queue.addLast(blk.data) + self.printQueue() + contentLength += blk.data.len + # this can happen if the content was stored with padding + if contentLength > manifest.datasetSize.int: + contentLength = manifest.datasetSize.int + echo "onBatch[contentLength]: ", contentLength + success() + + await self.node.fetchDatasetAsync(manifest, fetchLocal = true, onBatch = onBatch) + + echo "contentLength: ", contentLength + echo "manifest.datasetSize.int: ", manifest.datasetSize.int + if contentLength != manifest.datasetSize.int: + echo "Warning: entry length mismatch, expected ", + manifest.datasetSize.int, " got ", contentLength + + let entryLength = header.len + contentLength let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned if alignedEntryLength - entryLength > 0: echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" var data = newSeq[byte]() + echo "alignedEntryLength: ", alignedEntryLength + echo "entryLength: ", entryLength + echo "alignedEntryLength - entryLength: ", alignedEntryLength - entryLength data.setLen(alignedEntryLength - entryLength) + echo "data.len: ", data.len await self.queue.addLast(data) - self.printQueue() - - for cid in manifest.cids: - echo "fetching directory content: ", cid - if err =? (await self.fetchTarball(cid, basePath / manifest.name)).errorOption: - error "Error fetching directory content", - cid, path = basePath / manifest.name, err = err.msg - return failure( - "Error fetching directory content (cid = " & $cid & "), err = " & err.msg - ) - echo "fetchTarball[DIR]: ", cid, " basePath = ", basePath, " done" - return success() - - # this is a regular file (Codex) manifest - echo "Decoded file manifest: ", $manifest - let fileEntry = TarballEntry( - kind: ekNormalFile, - name: manifest.filename |? "unknown", - lastModified: getTime(), # ToDo: store actual time in the manifest - permissions: parseFilePermissions(cast[uint32](0o644)), # same here - contentLength: manifest.datasetSize.int, - ) - let header = self.createEntryHeader(fileEntry, basePath) - await self.queue.addLast(header.toBytes()) - self.printQueue() - var contentLength = 0 - - proc onBatch( - blocks: seq[Block] - ): Future[?!void] {.async: (raises: [CancelledError]).} = - echo "onBatch: ", blocks.len, " blocks" - for blk in blocks: - echo "onBatch[blk.data]: ", string.fromBytes(blk.data) - # await self.queue.addLast(string.fromBytes(blk.data)) - await self.queue.addLast(blk.data) self.printQueue() - contentLength += blk.data.len - # this can happen if the content was stored with padding - if contentLength > manifest.datasetSize.int: - contentLength = manifest.datasetSize.int - echo "onBatch[contentLength]: ", contentLength - success() - - await self.node.fetchDatasetAsync(manifest, fetchLocal = true, onBatch = onBatch) - - echo "contentLength: ", contentLength - echo "manifest.datasetSize.int: ", manifest.datasetSize.int - if contentLength != manifest.datasetSize.int: - echo "Warning: entry length mismatch, expected ", - manifest.datasetSize.int, " got ", contentLength - - let entryLength = header.len + contentLength - let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned - if alignedEntryLength - entryLength > 0: - echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" - var data = newSeq[byte]() - echo "alignedEntryLength: ", alignedEntryLength - echo "entryLength: ", entryLength - echo "alignedEntryLength - entryLength: ", alignedEntryLength - entryLength - data.setLen(alignedEntryLength - entryLength) - echo "data.len: ", data.len - await self.queue.addLast(data) - self.printQueue() - echo "fetchTarball: ", cid, " basePath = ", basePath, " done" - return success() + echo "fetchTarball: ", cid, " basePath = ", basePath, " done" + return success() + except CancelledError as e: + raise e + except CatchableError as e: + error "Error fetching tarball", cid, err = e.msg + return failure("Error fetching tarball (cid = " & $cid & "), err = " & e.msg) proc streamDirectory( self: DirectoryDownloader, cid: Cid diff --git a/codex/tarballs/tarballnodeextensions.nim b/codex/tarballs/tarballnodeextensions.nim index 9aa36fbd..fb4c4b9e 100644 --- a/codex/tarballs/tarballnodeextensions.nim +++ b/codex/tarballs/tarballnodeextensions.nim @@ -21,24 +21,32 @@ proc fetchDirectoryManifest*( ## Fetch and decode a manifest block ## - if err =? cid.isManifest.errorOption: - return failure "CID has invalid content type for manifest {$cid}" + # we only need try/catch here because PR for checked exceptions is + # not yet merged + try: + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" - trace "Retrieving directory manifest for cid", cid + trace "Retrieving directory manifest for cid", cid - without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: - trace "Error retrieving directory manifest block", cid, err = err.msg - return failure err + without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieving directory manifest block", cid, err = err.msg + return failure err - trace "Decoding directory manifest for cid", cid + trace "Decoding directory manifest for cid", cid - without manifest =? DirectoryManifest.decode(blk), err: - trace "Unable to decode as directory manifest", err = err.msg - return failure("Unable to decode as directory manifest") + without manifest =? DirectoryManifest.decode(blk), err: + trace "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as directory manifest") - trace "Decoded directory manifest", cid + trace "Decoded directory manifest", cid - manifest.success + return manifest.success + except CancelledError as e: + raise e + except CatchableError as e: + trace "Error fetching directory manifest", cid, err = e.msg + return failure(e.msg) proc storeDirectoryManifest*( self: CodexNodeRef, manifest: DirectoryManifest