mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-02-09 08:03:21 +00:00
adds torrent uploading API
This commit is contained in:
parent
2ab59f616d
commit
fece905431
1
.gitignore
vendored
1
.gitignore
vendored
@ -47,3 +47,4 @@ nim.cfg
|
||||
tests/integration/logs
|
||||
|
||||
data/
|
||||
data40k.bin
|
||||
133
codex/node.nim
133
codex/node.nim
@ -648,22 +648,136 @@ proc store*(
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
proc storePieces*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
blockSize: NBytes,
|
||||
pieceLength = NBytes 1024 * 64,
|
||||
): Future[?!BitTorrentManifest] {.async.} =
|
||||
## Save stream contents as dataset with given blockSize
|
||||
## to nodes's BlockStore, and return Cid of its manifest
|
||||
##
|
||||
info "Storing data"
|
||||
|
||||
let
|
||||
hcodec = Sha256HashCodec
|
||||
dataCodec = BlockCodec
|
||||
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
|
||||
numOfBlocksPerPiece = pieceLength.int div blockSize.int
|
||||
|
||||
var
|
||||
cids: seq[Cid]
|
||||
pieces: seq[MultiHash]
|
||||
pieceHashCtx: sha1
|
||||
pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece)
|
||||
|
||||
pieceHashCtx.init()
|
||||
|
||||
try:
|
||||
while (let chunk = await chunker.getBytes(); chunk.len > 0):
|
||||
if pieceIter.finished:
|
||||
without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure,
|
||||
err:
|
||||
return failure(err)
|
||||
pieces.add(mh)
|
||||
pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece)
|
||||
pieceHashCtx.init()
|
||||
without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? bt.Block.new(cid, chunk, verify = false):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
cids.add(cid)
|
||||
|
||||
if err =? (await self.networkStore.putBlock(blk)).errorOption:
|
||||
error "Unable to store block", cid = blk.cid, err = err.msg
|
||||
return failure(&"Unable to store block {blk.cid}")
|
||||
pieceHashCtx.update(chunk)
|
||||
discard pieceIter.next()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
finally:
|
||||
await stream.close()
|
||||
|
||||
without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err:
|
||||
return failure(err)
|
||||
pieces.add(mh)
|
||||
|
||||
without tree =? CodexTree.init(cids), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
|
||||
return failure(err)
|
||||
|
||||
for index, cid in cids:
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
if err =?
|
||||
(await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
|
||||
# TODO add log here
|
||||
return failure(err)
|
||||
|
||||
let manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = blockSize,
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
version = CIDv1,
|
||||
hcodec = hcodec,
|
||||
codec = dataCodec,
|
||||
filename = filename,
|
||||
mimetype = mimetype,
|
||||
)
|
||||
|
||||
without manifestBlk =? await self.storeManifest(manifest), err:
|
||||
error "Unable to store manifest"
|
||||
return failure(err)
|
||||
|
||||
info "Stored data",
|
||||
manifestCid = manifestBlk.cid,
|
||||
treeCid = treeCid,
|
||||
blocks = manifest.blocksCount,
|
||||
datasetSize = manifest.datasetSize,
|
||||
filename = manifest.filename,
|
||||
mimetype = manifest.mimetype
|
||||
|
||||
let info = BitTorrentInfo(
|
||||
length: manifest.datasetSize.uint64,
|
||||
pieceLength: pieceLength.uint32,
|
||||
pieces: pieces,
|
||||
name: filename,
|
||||
)
|
||||
|
||||
let torrentManifest =
|
||||
newBitTorrentManifest(info = info, codexManifestCid = manifestBlk.cid)
|
||||
|
||||
return torrentManifest.success
|
||||
|
||||
proc storeTorrent*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
info: BitTorrentInfo,
|
||||
infoHash: MultiHash,
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
): Future[?!Cid] {.async.} =
|
||||
): Future[?!MultiHash] {.async.} =
|
||||
info "Storing BitTorrent data"
|
||||
|
||||
without codexManifestCid =?
|
||||
await self.store(
|
||||
stream, filename = info.name, mimetype = mimetype, blockSize = NBytes 1024 * 16
|
||||
without bitTorrentManifest =?
|
||||
await self.storePieces(
|
||||
stream, filename = filename, mimetype = mimetype, blockSize = NBytes 1024 * 16
|
||||
):
|
||||
return failure("Unable to store BitTorrent data")
|
||||
|
||||
let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid)
|
||||
let infoBencoded = bencode(bitTorrentManifest.info)
|
||||
|
||||
without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without manifestBlk =? await self.storeBitTorrentManifest(
|
||||
bitTorrentManifest, infoHash
|
||||
@ -671,10 +785,9 @@ proc storeTorrent*(
|
||||
error "Unable to store manifest"
|
||||
return failure(err)
|
||||
|
||||
info "Stored BitTorrent data",
|
||||
manifestCid = manifestBlk.cid, codeManifestCid = codexManifestCid
|
||||
info "Stored BitTorrent data", infoHash = $infoHash, codexManifestCid
|
||||
|
||||
success manifestBlk.cid
|
||||
success infoHash
|
||||
|
||||
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
|
||||
|
||||
@ -258,6 +258,80 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
|
||||
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
|
||||
let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/torrent") do(
|
||||
resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("POST", corsOrigin)
|
||||
resp.setHeader(
|
||||
"Access-Control-Allow-Headers", "content-type, content-disposition"
|
||||
)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse:
|
||||
## Upload a file in a streaming manner
|
||||
##
|
||||
|
||||
trace "Handling file upload"
|
||||
var bodyReader = request.getBodyReader()
|
||||
if bodyReader.isErr():
|
||||
return RestApiResponse.error(Http500, msg = bodyReader.error())
|
||||
|
||||
# Attempt to handle `Expect` header
|
||||
# some clients (curl), wait 1000ms
|
||||
# before giving up
|
||||
#
|
||||
await request.handleExpect()
|
||||
|
||||
var mimetype = request.headers.getString(ContentTypeHeader).some
|
||||
|
||||
if mimetype.get() != "":
|
||||
let mimetypeVal = mimetype.get()
|
||||
var m = newMimetypes()
|
||||
let extension = m.getExt(mimetypeVal, "")
|
||||
if extension == "":
|
||||
return RestApiResponse.error(
|
||||
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
|
||||
)
|
||||
else:
|
||||
mimetype = string.none
|
||||
|
||||
const ContentDispositionHeader = "Content-Disposition"
|
||||
let contentDisposition = request.headers.getString(ContentDispositionHeader)
|
||||
let filename = getFilenameFromContentDisposition(contentDisposition)
|
||||
|
||||
if filename.isSome and not isValidFilename(filename.get()):
|
||||
return RestApiResponse.error(Http422, "The filename is not valid.")
|
||||
|
||||
# Here we could check if the extension matches the filename if needed
|
||||
|
||||
let reader = bodyReader.get()
|
||||
|
||||
try:
|
||||
without infoHash =? (
|
||||
await node.storeTorrent(
|
||||
AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
|
||||
filename = filename,
|
||||
mimetype = mimetype,
|
||||
)
|
||||
), error:
|
||||
error "Error uploading file", exc = error.msg
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
codex_api_uploads.inc()
|
||||
trace "Uploaded torrent", infoHash = $infoHash
|
||||
return RestApiResponse.response(infoHash.hex)
|
||||
except CancelledError:
|
||||
trace "Upload cancelled error"
|
||||
return RestApiResponse.error(Http500)
|
||||
except AsyncStreamError:
|
||||
trace "Async stream error"
|
||||
return RestApiResponse.error(Http500)
|
||||
finally:
|
||||
await reader.closeWait()
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/data") do(
|
||||
resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user