Download files without padding (#218)

The initial goal of this patch was to allow to download of a file via REST API in exactly the same size as it was uploaded, which required adding fields Chunker.offset and Manifest.originalBytes to keep that size. On top of that, we added more integrity checks to operations on Manifest, and reorganized TestNode.nim to test the actual interaction between node.store and node.retrieve operations.

Note that the wire format of Manifest was changed, so we need to recreate all BlockStores.

* Download without padding
* Fixed chunker tests
* Chunker: get rid of RabinChunker
* Verify offset in the chunker tests
* Use manifest.originalBytesPadded in StoreStream.size
* StoreStream: replace emptyBlock with zeroMem
* Manifest.bytes: compute how many bytes corresponding StoreStream(Manifest, pad) will return
* Manifest: verify originalBytes and originalLen on new/encode/decode
Also set originalBytes in each Manifest creation/update scenario
* Manifest: comments, split code into sections
* Reordered parameters to deal with int64 size in 32-bit builds
* TestNode.nim: combine Store and Retrieve tests
1. Instead of copy-pasting code from node.nim, new test calls node.store() and node.retrieve() in order to check that they can correctly store and then retrieve data
2. New test compares only file contents, manifest contents considered an implementation detail
3. New test chunks at odd chunkSize=BlockSize/1.618 in order to ensure that data retrieved correctly even when buffer sizes mismatch
* TestNode.nim: code refactoring
* Manifest.add: one more test
* Manifest.verify: return Result instead of raising Defect
* Node.store: added blockSize parameter
This commit is contained in:
Bulat-Ziganshin 2022-08-24 15:15:59 +03:00 committed by GitHub
parent 4bc701652f
commit f24ded0f76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 208 additions and 184 deletions

View File

@ -29,21 +29,14 @@ const
type
# default reader type
ChunkBuffer* = ptr UncheckedArray[byte]
Reader* =
proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].}
Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].}
ChunkerType* {.pure.} = enum
FixedChunker
RabinChunker
Chunker* = object
reader*: Reader
case kind*: ChunkerType:
of FixedChunker:
chunkSize*: Natural
pad*: bool # pad last block if less than size
of RabinChunker:
discard
# Reader that splits input data into fixed-size chunks
Chunker* = ref object
reader*: Reader # Procedure called to actually read the data
offset*: int # Bytes read so far (position in the stream)
chunkSize*: Natural # Size of each chunk
pad*: bool # Pad last chunk to chunkSize?
FileChunker* = Chunker
LPStreamChunker* = Chunker
@ -59,6 +52,8 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
if read <= 0:
return @[]
c.offset += read
if not c.pad and buff.len > read:
buff.setLen(read)
@ -66,24 +61,18 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
func new*(
T: type Chunker,
kind = ChunkerType.FixedChunker,
reader: Reader,
chunkSize = DefaultChunkSize,
pad = true): T =
var chunker = Chunker(
kind: kind,
reader: reader)
if kind == ChunkerType.FixedChunker:
chunker.pad = pad
chunker.chunkSize = chunkSize
return chunker
T(reader: reader,
offset: 0,
chunkSize: chunkSize,
pad: pad)
proc new*(
T: type LPStreamChunker,
stream: LPStream,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
pad = true): T =
## create the default File chunker
@ -103,16 +92,14 @@ proc new*(
return res
Chunker.new(
kind = ChunkerType.FixedChunker,
T.new(
reader = reader,
pad = pad,
chunkSize = chunkSize)
chunkSize = chunkSize,
pad = pad)
proc new*(
T: type FileChunker,
file: File,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
pad = true): T =
## create the default File chunker
@ -136,8 +123,7 @@ proc new*(
return total
Chunker.new(
kind = ChunkerType.FixedChunker,
T.new(
reader = reader,
pad = pad,
chunkSize = chunkSize)
chunkSize = chunkSize,
pad = pad)

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
# This module implements serialization and deserialization of Manifest
import pkg/upraises
push: {.upraises: [].}
@ -29,6 +31,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
## multicodec container (Dag-pb) for now
##
? manifest.verify()
var pbNode = initProtoBuffer()
for c in manifest.blocks:
@ -52,6 +55,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
# optional uint32 blockSize = 2; # size of a single block
# optional uint32 blocksLen = 3; # total amount of blocks
# optional ErasureInfo erasure = 4; # erasure coding info
# optional uint64 originalBytes = 5;# exact file size
# }
# ```
#
@ -61,6 +65,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
header.write(1, cid.data.buffer)
header.write(2, manifest.blockSize.uint32)
header.write(3, manifest.len.uint32)
header.write(5, manifest.originalBytes.uint64)
if manifest.protected:
var erasureInfo = initProtoBuffer()
erasureInfo.write(1, manifest.K.uint32)
@ -86,6 +91,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
pbErasureInfo: ProtoBuffer
rootHash: seq[byte]
originalCid: seq[byte]
originalBytes: uint64
blockSize: uint32
blocksLen: uint32
originalLen: uint32
@ -106,6 +112,9 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
if pbHeader.getField(3, blocksLen).isErr:
return failure("Unable to decode `blocksLen` from manifest!")
if pbHeader.getField(5, originalBytes).isErr:
return failure("Unable to decode `originalBytes` from manifest!")
if pbHeader.getField(4, pbErasureInfo).isErr:
return failure("Unable to decode `erasureInfo` from manifest!")
@ -140,6 +149,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
var
self = Manifest(
rootHash: rootHashCid.some,
originalBytes: originalBytes.int,
blockSize: blockSize.int,
blocks: blocks,
hcodec: (? rootHashCid.mhash.mapFailure).mcodec,
@ -153,6 +163,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
self.originalCid = ? Cid.init(originalCid).mapFailure
self.originalLen = originalLen.int
? self.verify()
self.success
proc encode*(

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
# This module defines all operations on Manifest
import pkg/upraises
push: {.upraises: [].}
@ -18,16 +20,18 @@ import pkg/questionable/results
import pkg/chronicles
import ../errors
import ../utils
import ../blocktype
import ./types
import ./coders
############################################################
# Operations on block list
############################################################
func len*(self: Manifest): int =
self.blocks.len
func size*(self: Manifest): int =
self.blocks.len * self.blockSize
func `[]`*(self: Manifest, i: Natural): Cid =
self.blocks[i]
@ -43,9 +47,11 @@ func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) =
self.blocks[self.len - i.int] = item
proc add*(self: Manifest, cid: Cid) =
assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks
self.rootHash = Cid.none
trace "Adding cid to manifest", cid
self.blocks.add(cid)
self.originalBytes = self.blocks.len * self.blockSize
iterator items*(self: Manifest): Cid =
for b in self.blocks:
@ -58,6 +64,44 @@ iterator pairs*(self: Manifest): tuple[key: int, val: Cid] =
func contains*(self: Manifest, cid: Cid): bool =
cid in self.blocks
############################################################
# Various sizes and verification
############################################################
func bytes*(self: Manifest, pad = true): int =
## Compute how many bytes corresponding StoreStream(Manifest, pad) will return
if pad or self.protected:
self.len * self.blockSize
else:
self.originalBytes
func rounded*(self: Manifest): int =
## Number of data blocks in *protected* manifest including padding at the end
roundUp(self.originalLen, self.K)
func steps*(self: Manifest): int =
## Number of EC groups in *protected* manifest
divUp(self.originalLen, self.K)
func verify*(self: Manifest): ?!void =
## Check manifest correctness
##
let originalLen = (if self.protected: self.originalLen else: self.len)
if divUp(self.originalBytes, self.blockSize) != originalLen:
return failure newException(CodexError, "Broken manifest: wrong originalBytes")
if self.protected and (self.len != self.steps * (self.K + self.M)):
return failure newException(CodexError, "Broken manifest: wrong originalLen")
return success()
############################################################
# Cid computation
############################################################
template hashBytes(mh: MultiHash): seq[byte] =
## get the hash bytes of a multihash object
##
@ -95,15 +139,6 @@ proc makeRoot*(self: Manifest): ?!void =
success()
func rounded*(self: Manifest): int =
if (self.originalLen mod self.K) != 0:
return self.originalLen + (self.K - (self.originalLen mod self.K))
self.originalLen
func steps*(self: Manifest): int =
self.rounded div self.K # number of blocks per row
proc cid*(self: Manifest): ?!Cid =
## Generate a root hash using the treehash algorithm
##
@ -113,6 +148,11 @@ proc cid*(self: Manifest): ?!Cid =
(!self.rootHash).success
############################################################
# Constructors
############################################################
proc new*(
T: type Manifest,
blocks: openArray[Cid] = [],
@ -133,6 +173,7 @@ proc new*(
codec: codec,
hcodec: hcodec,
blockSize: blockSize,
originalBytes: blocks.len * blockSize,
protected: protected).success
proc new*(
@ -148,6 +189,7 @@ proc new*(
version: manifest.version,
codec: manifest.codec,
hcodec: manifest.hcodec,
originalBytes: manifest.originalBytes,
blockSize: manifest.blockSize,
protected: true,
K: K, M: M,
@ -170,6 +212,7 @@ proc new*(
.catch
.get()
? self.verify()
self.success
proc new*(

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
# This module defines Manifest and all related types
import std/tables
import pkg/libp2p
import pkg/questionable
@ -25,9 +27,10 @@ const
type
Manifest* = ref object of RootObj
rootHash*: ?Cid # root (tree) hash of the contained data set
blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed)
blocks*: seq[Cid] # block Cid
rootHash*: ?Cid # Root (tree) hash of the contained data set
originalBytes*: int # Exact size of the original (uploaded) file
blockSize*: int # Size of each contained block (might not be needed if blocks are len-prefixed)
blocks*: seq[Cid] # Block Cid
version*: CidVersion # Cid version
hcodec*: MultiCodec # Multihash codec
codec*: MultiCodec # Data set codec

View File

@ -115,28 +115,33 @@ proc fetchBatched*(
proc retrieve*(
node: CodexNodeRef,
cid: Cid): Future[?!LPStream] {.async.} =
## Retrieve a block or manifest
## Retrieve by Cid a single block or an entire dataset described by manifest
##
if manifest =? (await node.fetchManifest(cid)):
if manifest.protected:
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} =
try:
without res =? (await node.erasure.decode(manifest)), error: # spawn an erasure decoding job
# Spawn an erasure decoding job
without res =? (await node.erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", cid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", cid
#
asyncSpawn erasureJob()
else:
# Prefetch the entire dataset into the local store
proc prefetchBlocks() {.async, raises: [Defect].} =
try:
discard await node.fetchBatched(manifest)
except CatchableError as exc:
trace "Exception prefetching blocks", exc = exc.msg
#
asyncSpawn prefetchBlocks()
return LPStream(StoreStream.new(node.blockStore, manifest)).success
#
# Retrieve all blocks of the dataset sequentially from the local store or network
return LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success
let
stream = BufferStream.new()
@ -158,14 +163,18 @@ proc retrieve*(
proc store*(
node: CodexNodeRef,
stream: LPStream): Future[?!Cid] {.async.} =
stream: LPStream,
blockSize = BlockSize): Future[?!Cid] {.async.} =
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
##
trace "Storing data"
without var blockManifest =? Manifest.new():
without var blockManifest =? Manifest.new(blockSize = blockSize):
return failure("Unable to create Block Set")
let
chunker = LPStreamChunker.new(stream, chunkSize = BlockSize)
# Manifest and chunker should use the same blockSize
let chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
try:
while (
@ -189,6 +198,7 @@ proc store*(
await stream.close()
# Generate manifest
blockManifest.originalBytes = chunker.offset # store the exact file size
without data =? blockManifest.encode():
return failure(
newException(CodexError, "Could not generate dataset manifest!"))

View File

@ -30,25 +30,29 @@ logScope:
topics = "dagger storestream"
type
# Make SeekableStream from a sequence of blocks stored in Manifest
# (only original file data - see StoreStream.size)
StoreStream* = ref object of SeekableStream
store*: BlockStore
manifest*: Manifest
emptyBlock*: seq[byte]
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
proc new*(
T: type StoreStream,
store: BlockStore,
manifest: Manifest): T =
manifest: Manifest,
pad = true): T =
result = T(
store: store,
manifest: manifest,
offset: 0,
emptyBlock: newSeq[byte](manifest.blockSize))
pad: pad,
offset: 0)
result.initStream()
method `size`*(self: StoreStream): int =
self.manifest.len * self.manifest.blockSize
bytes(self.manifest, self.pad)
proc `size=`*(self: StoreStream, size: int)
{.error: "Setting the size is forbidden".} =
@ -78,7 +82,7 @@ method readOnce*(
let
blockNum = self.offset div self.manifest.blockSize
blockOffset = self.offset mod self.manifest.blockSize
readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset)
readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize - blockOffset])
# Read contents of block `blockNum`
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
@ -87,13 +91,10 @@ method readOnce*(
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
copyMem(
pbytes.offset(read),
if blk.isEmpty:
self.emptyBlock[blockOffset].addr
zeroMem(pbytes.offset(read), readBytes)
else:
blk.data[blockOffset].addr,
readBytes)
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes
@ -102,12 +103,6 @@ method readOnce*(
return read
method closeImpl*(self: StoreStream) {.async.} =
try:
trace "Closing StoreStream"
self.offset = self.manifest.len * self.manifest.blockSize # set Eof
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing StoreStream", msg = exc.msg
self.offset = self.size # set Eof
await procCall LPStream(self).closeImpl()

View File

@ -2,3 +2,14 @@ import ./utils/asyncheapqueue
import ./utils/fileutils
export asyncheapqueue, fileutils
func divUp*[T](a, b : T): T =
## Division with result rounded up (rather than truncated as in 'div')
assert(b != 0)
if a==0: 0 else: ((a - 1) div b) + 1
func roundUp*[T](a, b : T): T =
## Round up 'a' to the next value divisible by 'b'
divUp(a,b) * b

View File

@ -13,7 +13,6 @@ type
proc new*(
T: type RandomChunker,
rng: Rng,
kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize,
size: int,
pad = false): T =
@ -43,7 +42,6 @@ proc new*(
return read
Chunker.new(
kind = ChunkerType.FixedChunker,
reader = reader,
pad = pad,
chunkSize = chunkSize)

View File

@ -11,12 +11,14 @@ suite "Chunking":
let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0]
proc reader(data: ChunkBuffer, len: int): Future[int]
{.gcsafe, async, raises: [Defect].} =
if offset >= contents.len:
let read = min(contents.len - offset, len)
if read == 0:
return 0
copyMem(data, unsafeAddr contents[offset], len)
offset += 2
return len
copyMem(data, unsafeAddr contents[offset], read)
offset += read
return read
let chunker = Chunker.new(
reader = reader,
@ -29,9 +31,9 @@ suite "Chunking":
(await chunker.getBytes()) == [7.byte, 8]
(await chunker.getBytes()) == [9.byte, 0]
(await chunker.getBytes()) == []
chunker.offset == offset
test "should chunk LPStream":
var offset = 0
let stream = BufferStream.new()
let chunker = LPStreamChunker.new(
stream = stream,
@ -51,6 +53,7 @@ suite "Chunking":
(await chunker.getBytes()) == [7.byte, 8]
(await chunker.getBytes()) == [9.byte, 0]
(await chunker.getBytes()) == []
chunker.offset == 10
await writerFut
@ -69,4 +72,7 @@ suite "Chunking":
check buff.len <= fileChunker.chunkSize
data.add(buff)
check string.fromBytes(data) == readFile(path)
check:
string.fromBytes(data) == readFile(path)
fileChunker.offset == data.len

View File

@ -1,5 +1,6 @@
import std/os
import std/options
import std/math
import pkg/asynctest
import pkg/chronos
@ -39,6 +40,39 @@ suite "Test Node":
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} =
# Collect blocks from Chunker into Manifest
var
manifest = Manifest.new().tryGet()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = bt.Block.new(chunk).tryGet()
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
return manifest
proc retrieve(cid: Cid): Future[seq[byte]] {.async.} =
# Retrieve an entire file contents by file Cid
let
oddChunkSize = math.trunc(BlockSize/1.359).int # Let's check that node.retrieve can correctly rechunk data
stream = (await node.retrieve(cid)).tryGet()
var
data: seq[byte]
while not stream.atEof:
var
buf = newSeq[byte](oddChunkSize)
res = await stream.readOnce(addr buf[0], oddChunkSize)
check res <= oddChunkSize
buf.setLen(res)
data &= buf
return data
setup:
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file, chunkSize = BlockSize)
@ -61,18 +95,9 @@ suite "Test Node":
await node.stop()
test "Fetch Manifest":
var
manifest = Manifest.new().tryGet()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = bt.Block.new(chunk).tryGet()
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
let
manifest = await Manifest.fetch(chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec
@ -88,115 +113,51 @@ suite "Test Node":
fetched.blocks == manifest.blocks
test "Block Batching":
var
manifest = Manifest.new().tryGet()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = bt.Block.new(chunk).tryGet()
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec
).tryGet()
manifest = await Manifest.fetch(chunker)
for batchSize in 1..12:
(await node.fetchBatched(
manifest,
batchSize = 3,
batchSize = batchSize,
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= 3
check blocks.len > 0 and blocks.len <= batchSize
)).tryGet()
(await node.fetchBatched(
manifest,
batchSize = 6,
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= 6
)).tryGet()
(await node.fetchBatched(
manifest,
batchSize = 9,
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= 9
)).tryGet()
(await node.fetchBatched(
manifest,
batchSize = 11,
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= 11
)).tryGet()
test "Store Data Stream":
test "Store and retrieve Data Stream":
let
stream = BufferStream.new()
storeFut = node.store(stream)
oddChunkSize = math.trunc(BlockSize/1.618).int # Let's check that node.store can correctly rechunk these odd chunks
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue
var
manifest = Manifest.new().tryGet()
original: seq[byte]
try:
while (
let chunk = await chunker.getBytes();
let chunk = await oddChunker.getBytes();
chunk.len > 0):
original &= chunk
await stream.pushData(chunk)
manifest.add(bt.Block.new(chunk).tryGet().cid)
finally:
await stream.pushEof()
await stream.close()
let
manifestCid = (await storeFut).tryGet()
check:
(await localStore.hasBlock(manifestCid)).tryGet()
var
let
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
localManifest = Manifest.decode(manifestBlock).tryGet()
check:
manifest.len == localManifest.len
manifest.cid == localManifest.cid
test "Retrieve Data Stream":
var
manifest = Manifest.new().tryGet()
original: seq[byte]
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = bt.Block.new(chunk).tryGet()
original &= chunk
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec
).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let stream = (await node.retrieve(manifestBlock.cid)).tryGet()
var data: seq[byte]
while not stream.atEof:
var
buf = newSeq[byte](BlockSize)
res = await stream.readOnce(addr buf[0], BlockSize div 2)
buf.setLen(res)
data &= buf
check data == original
data = await retrieve(manifestCid)
check:
data.len == localManifest.originalBytes
data.len == original.len
sha256.digest(data) == sha256.digest(original)
test "Retrieve One Block":
let

2
vendor/questionable vendored

@ -1 +1 @@
Subproject commit 82408a5ca2c24eca411d6774cf83816877170627
Subproject commit 6018fd43e033d5a5310faa45bcaa1b44049469a4