mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-09 12:35:51 +00:00
f24ded0f76
The initial goal of this patch was to allow to download of a file via REST API in exactly the same size as it was uploaded, which required adding fields Chunker.offset and Manifest.originalBytes to keep that size. On top of that, we added more integrity checks to operations on Manifest, and reorganized TestNode.nim to test the actual interaction between node.store and node.retrieve operations. Note that the wire format of Manifest was changed, so we need to recreate all BlockStores. * Download without padding * Fixed chunker tests * Chunker: get rid of RabinChunker * Verify offset in the chunker tests * Use manifest.originalBytesPadded in StoreStream.size * StoreStream: replace emptyBlock with zeroMem * Manifest.bytes: compute how many bytes corresponding StoreStream(Manifest, pad) will return * Manifest: verify originalBytes and originalLen on new/encode/decode Also set originalBytes in each Manifest creation/update scenario * Manifest: comments, split code into sections * Reordered parameters to deal with int64 size in 32-bit builds * TestNode.nim: combine Store and Retrieve tests 1. Instead of copy-pasting code from node.nim, new test calls node.store() and node.retrieve() in order to check that they can correctly store and then retrieve data 2. New test compares only file contents, manifest contents considered an implementation detail 3. New test chunks at odd chunkSize=BlockSize/1.618 in order to ensure that data retrieved correctly even when buffer sizes mismatch * TestNode.nim: code refactoring * Manifest.add: one more test * Manifest.verify: return Result instead of raising Defect * Node.store: added blockSize parameter
173 lines
4.8 KiB
Nim
173 lines
4.8 KiB
Nim
import std/os
|
|
import std/options
|
|
import std/math
|
|
|
|
import pkg/asynctest
|
|
import pkg/chronos
|
|
import pkg/chronicles
|
|
import pkg/stew/byteutils
|
|
|
|
import pkg/nitro
|
|
import pkg/libp2p
|
|
import pkg/libp2pdht/discv5/protocol as discv5
|
|
|
|
import pkg/codex/stores
|
|
import pkg/codex/blockexchange
|
|
import pkg/codex/chunker
|
|
import pkg/codex/node
|
|
import pkg/codex/manifest
|
|
import pkg/codex/discovery
|
|
import pkg/codex/blocktype as bt
|
|
|
|
import ./helpers
|
|
|
|
suite "Test Node":
|
|
let
|
|
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
|
|
|
|
var
|
|
file: File
|
|
chunker: Chunker
|
|
switch: Switch
|
|
wallet: WalletRef
|
|
network: BlockExcNetwork
|
|
localStore: CacheStore
|
|
engine: BlockExcEngine
|
|
store: NetworkStore
|
|
node: CodexNodeRef
|
|
blockDiscovery: Discovery
|
|
peerStore: PeerCtxStore
|
|
pendingBlocks: PendingBlocksManager
|
|
discovery: DiscoveryEngine
|
|
|
|
proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} =
|
|
# Collect blocks from Chunker into Manifest
|
|
var
|
|
manifest = Manifest.new().tryGet()
|
|
|
|
while (
|
|
let chunk = await chunker.getBytes();
|
|
chunk.len > 0):
|
|
|
|
let blk = bt.Block.new(chunk).tryGet()
|
|
(await localStore.putBlock(blk)).tryGet()
|
|
manifest.add(blk.cid)
|
|
|
|
return manifest
|
|
|
|
proc retrieve(cid: Cid): Future[seq[byte]] {.async.} =
|
|
# Retrieve an entire file contents by file Cid
|
|
let
|
|
oddChunkSize = math.trunc(BlockSize/1.359).int # Let's check that node.retrieve can correctly rechunk data
|
|
stream = (await node.retrieve(cid)).tryGet()
|
|
var
|
|
data: seq[byte]
|
|
|
|
while not stream.atEof:
|
|
var
|
|
buf = newSeq[byte](oddChunkSize)
|
|
res = await stream.readOnce(addr buf[0], oddChunkSize)
|
|
check res <= oddChunkSize
|
|
buf.setLen(res)
|
|
data &= buf
|
|
|
|
return data
|
|
|
|
setup:
|
|
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
|
|
chunker = FileChunker.new(file = file, chunkSize = BlockSize)
|
|
switch = newStandardSwitch()
|
|
wallet = WalletRef.new(EthPrivateKey.random())
|
|
network = BlockExcNetwork.new(switch)
|
|
localStore = CacheStore.new()
|
|
blockDiscovery = Discovery.new(switch.peerInfo, Port(0))
|
|
peerStore = PeerCtxStore.new()
|
|
pendingBlocks = PendingBlocksManager.new()
|
|
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
|
|
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
|
|
store = NetworkStore.new(engine, localStore)
|
|
node = CodexNodeRef.new(switch, store, engine, nil, blockDiscovery) # TODO: pass `Erasure`
|
|
|
|
await node.start()
|
|
|
|
teardown:
|
|
close(file)
|
|
await node.stop()
|
|
|
|
test "Fetch Manifest":
|
|
let
|
|
manifest = await Manifest.fetch(chunker)
|
|
|
|
manifestBlock = bt.Block.new(
|
|
manifest.encode().tryGet(),
|
|
codec = DagPBCodec
|
|
).tryGet()
|
|
|
|
(await localStore.putBlock(manifestBlock)).tryGet()
|
|
|
|
let
|
|
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
|
|
|
|
check:
|
|
fetched.cid == manifest.cid
|
|
fetched.blocks == manifest.blocks
|
|
|
|
test "Block Batching":
|
|
let
|
|
manifest = await Manifest.fetch(chunker)
|
|
|
|
for batchSize in 1..12:
|
|
(await node.fetchBatched(
|
|
manifest,
|
|
batchSize = batchSize,
|
|
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
|
|
check blocks.len > 0 and blocks.len <= batchSize
|
|
)).tryGet()
|
|
|
|
test "Store and retrieve Data Stream":
|
|
let
|
|
stream = BufferStream.new()
|
|
storeFut = node.store(stream)
|
|
oddChunkSize = math.trunc(BlockSize/1.618).int # Let's check that node.store can correctly rechunk these odd chunks
|
|
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue
|
|
var
|
|
original: seq[byte]
|
|
|
|
try:
|
|
while (
|
|
let chunk = await oddChunker.getBytes();
|
|
chunk.len > 0):
|
|
original &= chunk
|
|
await stream.pushData(chunk)
|
|
finally:
|
|
await stream.pushEof()
|
|
await stream.close()
|
|
|
|
let
|
|
manifestCid = (await storeFut).tryGet()
|
|
check:
|
|
(await localStore.hasBlock(manifestCid)).tryGet()
|
|
|
|
let
|
|
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
|
|
localManifest = Manifest.decode(manifestBlock).tryGet()
|
|
|
|
let
|
|
data = await retrieve(manifestCid)
|
|
check:
|
|
data.len == localManifest.originalBytes
|
|
data.len == original.len
|
|
sha256.digest(data) == sha256.digest(original)
|
|
|
|
test "Retrieve One Block":
|
|
let
|
|
testString = "Block 1"
|
|
blk = bt.Block.new(testString.toBytes).tryGet()
|
|
|
|
(await localStore.putBlock(blk)).tryGet()
|
|
let stream = (await node.retrieve(blk.cid)).tryGet()
|
|
|
|
var data = newSeq[byte](testString.len)
|
|
await stream.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == testString
|