diff --git a/.gitignore b/.gitignore index f6292dda..17369c69 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,4 @@ nim.cfg tests/integration/logs data/ +data40k.bin \ No newline at end of file diff --git a/codex/node.nim b/codex/node.nim index 79bd962c..fc44c9fa 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -648,22 +648,136 @@ proc store*( return manifestBlk.cid.success +proc storePieces*( + self: CodexNodeRef, + stream: LPStream, + filename: ?string = string.none, + mimetype: ?string = string.none, + blockSize: NBytes, + pieceLength = NBytes 1024 * 64, +): Future[?!BitTorrentManifest] {.async.} = + ## Save stream contents as dataset with given blockSize + ## to nodes's BlockStore, and return Cid of its manifest + ## + info "Storing data" + + let + hcodec = Sha256HashCodec + dataCodec = BlockCodec + chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + numOfBlocksPerPiece = pieceLength.int div blockSize.int + + var + cids: seq[Cid] + pieces: seq[MultiHash] + pieceHashCtx: sha1 + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + + pieceHashCtx.init() + + try: + while (let chunk = await chunker.getBytes(); chunk.len > 0): + if pieceIter.finished: + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, + err: + return failure(err) + pieces.add(mh) + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + pieceHashCtx.init() + without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: + return failure(err) + + without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err: + return failure(err) + + without blk =? bt.Block.new(cid, chunk, verify = false): + return failure("Unable to init block from chunk!") + + cids.add(cid) + + if err =? (await self.networkStore.putBlock(blk)).errorOption: + error "Unable to store block", cid = blk.cid, err = err.msg + return failure(&"Unable to store block {blk.cid}") + pieceHashCtx.update(chunk) + discard pieceIter.next() + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + finally: + await stream.close() + + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err: + return failure(err) + pieces.add(mh) + + without tree =? CodexTree.init(cids), err: + return failure(err) + + without treeCid =? tree.rootCid(CIDv1, dataCodec), err: + return failure(err) + + for index, cid in cids: + without proof =? tree.getProof(index), err: + return failure(err) + if err =? + (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption: + # TODO add log here + return failure(err) + + let manifest = Manifest.new( + treeCid = treeCid, + blockSize = blockSize, + datasetSize = NBytes(chunker.offset), + version = CIDv1, + hcodec = hcodec, + codec = dataCodec, + filename = filename, + mimetype = mimetype, + ) + + without manifestBlk =? await self.storeManifest(manifest), err: + error "Unable to store manifest" + return failure(err) + + info "Stored data", + manifestCid = manifestBlk.cid, + treeCid = treeCid, + blocks = manifest.blocksCount, + datasetSize = manifest.datasetSize, + filename = manifest.filename, + mimetype = manifest.mimetype + + let info = BitTorrentInfo( + length: manifest.datasetSize.uint64, + pieceLength: pieceLength.uint32, + pieces: pieces, + name: filename, + ) + + let torrentManifest = + newBitTorrentManifest(info = info, codexManifestCid = manifestBlk.cid) + + return torrentManifest.success + proc storeTorrent*( self: CodexNodeRef, stream: LPStream, - info: BitTorrentInfo, - infoHash: MultiHash, + filename: ?string = string.none, mimetype: ?string = string.none, -): Future[?!Cid] {.async.} = +): Future[?!MultiHash] {.async.} = info "Storing BitTorrent data" - without codexManifestCid =? - await self.store( - stream, filename = info.name, mimetype = mimetype, blockSize = NBytes 1024 * 16 + without bitTorrentManifest =? + await self.storePieces( + stream, filename = filename, mimetype = mimetype, blockSize = NBytes 1024 * 16 ): return failure("Unable to store BitTorrent data") - let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid) + let infoBencoded = bencode(bitTorrentManifest.info) + + without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err: + return failure(err) without manifestBlk =? await self.storeBitTorrentManifest( bitTorrentManifest, infoHash @@ -671,10 +785,9 @@ proc storeTorrent*( error "Unable to store manifest" return failure(err) - info "Stored BitTorrent data", - manifestCid = manifestBlk.cid, codeManifestCid = codexManifestCid + info "Stored BitTorrent data", infoHash = $infoHash, codexManifestCid - success manifestBlk.cid + success infoHash proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 40fc0ae5..83bfc654 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -258,6 +258,80 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string = proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) = let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition + router.api(MethodOptions, "/api/codex/v1/torrent") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse: + ## Upload a file in a streaming manner + ## + + trace "Handling file upload" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500, msg = bodyReader.error()) + + # Attempt to handle `Expect` header + # some clients (curl), wait 1000ms + # before giving up + # + await request.handleExpect() + + var mimetype = request.headers.getString(ContentTypeHeader).some + + if mimetype.get() != "": + let mimetypeVal = mimetype.get() + var m = newMimetypes() + let extension = m.getExt(mimetypeVal, "") + if extension == "": + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) + else: + mimetype = string.none + + const ContentDispositionHeader = "Content-Disposition" + let contentDisposition = request.headers.getString(ContentDispositionHeader) + let filename = getFilenameFromContentDisposition(contentDisposition) + + if filename.isSome and not isValidFilename(filename.get()): + return RestApiResponse.error(Http422, "The filename is not valid.") + + # Here we could check if the extension matches the filename if needed + + let reader = bodyReader.get() + + try: + without infoHash =? ( + await node.storeTorrent( + AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)), + filename = filename, + mimetype = mimetype, + ) + ), error: + error "Error uploading file", exc = error.msg + return RestApiResponse.error(Http500, error.msg) + + codex_api_uploads.inc() + trace "Uploaded torrent", infoHash = $infoHash + return RestApiResponse.response(infoHash.hex) + except CancelledError: + trace "Upload cancelled error" + return RestApiResponse.error(Http500) + except AsyncStreamError: + trace "Async stream error" + return RestApiResponse.error(Http500) + finally: + await reader.closeWait() + router.api(MethodOptions, "/api/codex/v1/data") do( resp: HttpResponseRef ) -> RestApiResponse: