11 KiB
#codex/upload #codex/download
| related | Codex Blocks, Block Storage, Codex Block Exchange Protocol |
|---|
For a high level overview of how Codex works, you can check the Codex-BitTorrent Integration presentation slides.
To have a more detailed overview of the block exchange architecture, please refer to Codex Block Exchange Architecture.
We upload the content with API /api/codex/v1/data. The handler defined in codex/rest/api.nim calls CodexNodeRef.store and then returns the Cid of the manifest file corresponding to the contents:
without cid =? (
await node.store(
AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
filename = filename,
mimetype = mimetype,
)
), error:
error "Error uploading file", exc = error.msg
return RestApiResponse.error(Http500, error.msg)
codex_api_uploads.inc()
trace "Uploaded file", cid
return RestApiResponse.response($cid)
See Using Codex in the Codex docs on how to use the Codex client.
node.store (codex/node.nim) reads data from the stream, and from each chunk it does the following:
while (let chunk = await chunker.getBytes(); chunk.len > 0):
without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
return failure(err)
without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err:
return failure(err)
without blk =? bt.Block.new(cid, chunk, verify = false):
return failure("Unable to init block from chunk!")
cids.add(cid)
if err =? (await self.networkStore.putBlock(blk)).errorOption:
error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}")
The default chunk size is given by the default block size defined in codex/codextypes.nim:
const
# Size of blocks for storage / network exchange,
DefaultBlockSize* = NBytes 1024 * 64
storing blocks
Now, the networkStore.putBlock:
method putBlock*(
self: NetworkStore, blk: Block, ttl = Duration.none
): Future[?!void] {.async.} =
## Store block locally and notify the network
##
let res = await self.localStore.putBlock(blk, ttl)
if res.isErr:
return res
await self.engine.resolveBlocks(@[blk])
return success()
We first store the stuff locally:
method putBlock*(
self: RepoStore, blk: Block, ttl = Duration.none
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
without res =? await self.storeBlock(blk, expiry), err:
return failure(err)
if res.kind == Stored:
trace "Block Stored"
if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# rollback changes
without delRes =? await self.tryDeleteBlock(blk.cid), err:
return failure(err)
return failure(err)
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err)
if onBlock =? self.onBlockStored:
await onBlock(blk.cid)
else:
trace "Block already exists"
return success()
The storeBlock defined in codex/stores/repostore/operations.nim is where we store the block and the metadata.
proc storeBlock*(
self: RepoStore, blk: Block, minExpiry: SecondsSince1970
): Future[?!StoreResult] {.async.} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
without metaKey =? createBlockExpirationMetadataKey(blk.cid), err:
return failure(err)
without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err:
return failure(err)
await self.metaDs.modifyGet(
...
)
The modifyGet deserve separate treatment.
modifyGet
modifyGet is called on the metaDs. metaDs has a wrapper type:
TypedDatastore* = ref object of RootObj
ds*: Datastore
And ds above is:
type
LevelDbDatastore* = ref object of Datastore
db: LevelDb
locks: TableRef[Key, AsyncLock]
There is a cascade of callbacks going from RepoStore through TypedDatastore down to LevelDbDataStore as presented on the following sequence diagram:
The diagram above can also be viewed online (requires Mermaid Chart account)
LevelDbDataStore directly interacts with the underlying storage and ensures atomicity of the modifyGet operation. TypedDatastore performs encoding and decoding of the data. Finally, RepoStore handles metadata creation or update, and also writes the actual block to the underlying block storage via its repoDS instance variable.
After the blocks are stored in repoDS, back in node.store (CodexNodeRef.store), we build the Merkle Tree for our block cids and then we compute its root (treeCid). Finally, for each block (cid) we compute the Codex Merkle Proofs, and we store each cid, block index, and proof under the computed treeCid:
without tree =? CodexTree.init(cids), err:
return failure(err)
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
return failure(err)
for index, cid in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =?
(await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)
This concludes the local block storage. We leave the description of engine.resolveBlocks(@[blk]) for later, when describing the block exchange protocol.
Downloading content
When we want to download the content from the network, we use /api/codex/v1/data/{cid}/network/stream API where we call await node.retrieveCid(cid.get(), local = false, resp = resp).
node.retrieveCid tries to get a stream (descendent of libp2p's LPStream):
without stream =? (await node.retrieve(cid, local)), error:
if error of BlockNotFoundError:
resp.status = Http404
return await resp.sendBody("")
else:
resp.status = Http500
return await resp.sendBody(error.msg)
This stream will be read chunk by chunk (DefaultBlockSize) and returned to the client.
To see what the stream really will be, we need to dive into node.retrieve(cid, local) (local is false in this case):
proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true
): Future[?!LPStream] {.async.} =
## Retrieve by Cid a single block or an entire dataset described by manifest
##
if local and not await (cid in self.networkStore):
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
without manifest =? (await self.fetchManifest(cid)), err:
if err of AsyncTimeoutError:
return failure(err)
return await self.streamSingleBlock(cid)
await self.streamEntireDataset(manifest, cid)
We first try to get the manifest with self.fetchManifest(cid):
proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} =
## Fetch and decode a manifest block
##
if err =? cid.isManifest.errorOption:
return failure "CID has invalid content type for manifest {$cid}"
trace "Retrieving manifest for cid", cid
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
trace "Error retrieve manifest block", cid, err = err.msg
return failure err
trace "Decoding manifest for cid", cid
without manifest =? Manifest.decode(blk), err:
trace "Unable to decode as manifest", err = err.msg
return failure("Unable to decode as manifest")
trace "Decoded manifest", cid
return manifest.success
Manifest is single block, and we get it with:
self.networkStore.getBlock(BlockAddress.init(cid))
Here BlockAddress.init(cid) reduces to BlockAddress(leaf: false, cid: cid), which means our Codex Blocks is just a Cid. getBlock will try to get the block from the localStore first:
method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
error "Error getting block from local store", address, err = err.msg
return failure err
without newBlock =? (await self.engine.requestBlock(address)), err:
error "Unable to get block from exchange engine", address, err = err.msg
return failure err
return success newBlock
return success blk
It is RepoStore, which by default is set to be a FSDatastore:
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
Now, we have leaf set to false, thus we will be using simpler getBlock variant:
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
If we do not have the block in the localStore, we will be trying to get it from the network with self.engine.requestBlock(address):
proc requestBlock*(
b: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
try:
success await blockFuture
except AsyncTimeoutError as err:
failure err
This is also where Codex WantList topic becomes relevant (and perhaps also Codex Block Exchange Protocol.
After we got the manifest we will proceed with creating a stream through which we will be stream the data down to the browser:
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
The stream abstraction provides a readOnce method which will be retrieving the blocks from the networkStore and sending the requested bytes down via the stream. readOnce is called in node.retrieveCid.