diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim index f3bca851..8bcf9436 100644 --- a/codex/erasure/asyncbackend.nim +++ b/codex/erasure/asyncbackend.nim @@ -8,6 +8,7 @@ ## those terms. import std/sequtils +import std/sugar import pkg/taskpools import pkg/taskpools/flowvars @@ -15,6 +16,9 @@ import pkg/chronos import pkg/chronos/threadsync import pkg/questionable/results +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/stew/io2 + import ./backend import ../errors import ../logutils @@ -50,6 +54,49 @@ type EncodeTaskResult = Result[SharedArrayHolder[byte], cstring] DecodeTaskResult = Result[SharedArrayHolder[byte], cstring] +proc dumpOf(prefix: string, bytes: seq[seq[byte]]): void = + for i in 0.. 0: + io2.writeFile(prefix & $i, bytes[i]).tryGet() + +proc hashOf(bytes: ref seq[seq[byte]]): string = + var totalLen = 0 + for i in 0.. 0: + copyMem(addr buf[offset], addr bytes[i][0], bytes[i].len) + offset = offset + bytes[i].len + + let mhash = MultiHash.digest("sha2-256", buf) + return mhash.get().hex + +proc unsafeHashOf(bytes: seq[pointer], lens: seq[int]): string = + var totalLen = 0 + for l in lens: + totalLen = totalLen + l + + var buf = newSeq[byte]() + + buf.setLen(totalLen) + + var offset = 0 + for i in 0.. 0: + copyMem(addr buf[offset], bytes[i], l) + offset = offset + l + + let mhash = MultiHash.digest("sha2-256", buf) + return mhash.get().hex + proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = var data = data.unsafeAddr @@ -79,12 +126,42 @@ proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = if err =? args.signal.fireSync().mapFailure.errorOption(): error "Error firing signal", msg = err.msg -proc decodeTask(args: DecodeTaskArgs, data: seq[seq[byte]], parity: seq[seq[byte]]): DecodeTaskResult = +proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[byte]], debug: bool): DecodeTaskResult = + + if debug: + dumpOf("thread_data_", odata) + dumpOf("thread_parity", oparity) + # if debugFlag: + # io2.writeFile("original_block_" & $idx, blk.data).tryGet() + + var ptrsData: seq[pointer] + for i in 0.. 0: + ptrsData.add(unsafeAddr odata[i][0]) + else: + ptrsData.add(unsafeAddr odata) + + var ptrsParity: seq[pointer] + for i in 0.. 0: + ptrsParity.add(unsafeAddr oparity[i][0]) + else: + ptrsParity.add(unsafeAddr oparity) + + echo "bef unsafe hash of data " & unsafeHashOf(ptrsData, odata.mapIt(it.len)) + echo "bef unsafe hash of parity " & unsafeHashOf(ptrsParity, oparity.mapIt(it.len)) + + var + data = odata.unsafeAddr + parity = oparity.unsafeAddr + var - data = data.unsafeAddr - parity = parity.unsafeAddr recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize)) + var ptrs: seq[pointer] + for i in 0.. + self.store.getBlock( + BlockAddress.init(manifest.treeCid, i) + ).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) + ) + + + pendingBlocks + +proc getPendingBlocks2( + self: Erasure, + manifest: Manifest, + indicies: seq[int]): AsyncIter[(?!bt.Block, int)] = + ## Get pending blocks iterator + ## + + let shift = indicies[0] + + let newIndicies = @[0, 203, 196, 189, 182, 175, 168, 161, 154, 147, 140, 133, 126, 119, 112, 105, 98, 91, 84, 77].mapIt(it + shift) + + var + indiciesIter = Iter[int].new(newIndicies) + # indiciesIter = Iter[int].new(indicies.filterIt((it mod 3) != 2)) + # indiciesIter = Iter[int].new(@(@[indicies[29]] & indicies[5..<25])) + + + pendingBlocks = mapAsync[int, (?!bt.Block, int)](indiciesIter, (i: int) => + self.store.getBlock( + BlockAddress.init(manifest.treeCid, i) + ).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) + ) + + pendingBlocks + +proc getPendingBlocks3( + self: Erasure, + manifest: Manifest, + indicies: seq[int]): AsyncIter[(?!bt.Block, int)] = + ## Get pending blocks iterator + ## + var # request blocks from the store pendingBlocks = indicies.map( (i: int) => @@ -190,6 +236,8 @@ proc prepareDecodingData( ## `emptyBlock` - the empty block to be used for padding ## + var recIndicies = newSeq[int]() + let strategy = encoded.protectedStrategy.init( firstIndex = 0, @@ -197,7 +245,7 @@ proc prepareDecodingData( iterations = encoded.steps ) indicies = toSeq(strategy.getIndicies(step)) - pendingBlocksIter = self.getPendingBlocks(encoded, indicies) + pendingBlocksIter = self.getPendingBlocks2(encoded, indicies) var dataPieces = 0 @@ -214,6 +262,8 @@ proc prepareDecodingData( trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue + recIndicies.add(idx) + let pos = indexToPos(encoded.steps, idx, step) @@ -236,6 +286,13 @@ proc prepareDecodingData( resolved.inc + let recCids = collect: + for i in recIndicies: + cids[i] + + without recTree =? CodexTree.init(recCids), err: + return failure(err) + return success (dataPieces.Natural, parityPieces.Natural) proc init*( @@ -317,6 +374,7 @@ proc encodeData( trace "Adding parity block", cid = blk.cid, idx cids[idx] = blk.cid + io2.writeFile("parity_" & $idx, blk.data).get() if isErr (await self.store.putBlock(blk)): trace "Unable to store block!", cid = blk.cid return failure("Unable to store block!") @@ -372,6 +430,47 @@ proc encode*( return success encodedManifest +proc hashOf(bytes: ref seq[seq[byte]]): string = + var totalLen = 0 + for i in 0.. 0: + copyMem(addr buf[offset], addr bytes[i][0], bytes[i].len) + offset = offset + bytes[i].len + + let mhash = MultiHash.digest("sha2-256", buf).mapFailure() + + return mhash.get().hex + # without mh =? mhash, err: + # return "error " & err.msg + + # return mh.hex + +# proc unsafeHashOf(bytes: seq[pointer], lens: seq[int]): string = +# var totalLen = 0 +# for l in lens: +# totalLen = totalLen + l + +# var buf = newSeq[byte]() + +# buf.setLen(totalLen) + +# var offset = 0 +# for l in lens: +# if l > 0: +# copyMem(addr buf[offset], bytes[i], l) +# offset = offset + l + +# let mhash = MultiHash.digest("sha2-256", buf) +# return $mhash + proc decode*( self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = @@ -418,6 +517,10 @@ proc decode*( trace "Error decoding data", err = err.msg return failure(err) + echo "hash of recovered " & hashOf(recovered) + + # GC_fullCollect() + for i in 0.. 0): @@ -92,6 +96,10 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest cids.add(blk.cid) (await store.putBlock(blk)).tryGet() + io2.writeFile("block_" & $i, blk.data) + + i.inc + let tree = CodexTree.init(cids).tryGet() treeCid = tree.rootCid.tryGet() diff --git a/tests/codex/rndtesterasure.nim b/tests/codex/rndtesterasure.nim new file mode 100644 index 00000000..e4abae2f --- /dev/null +++ b/tests/codex/rndtesterasure.nim @@ -0,0 +1,64 @@ +import std/sequtils +import std/sugar +import std/cpuinfo + +import pkg/chronos +import pkg/datastore +import pkg/questionable/results + +import pkg/codex/erasure +import pkg/codex/manifest +import pkg/codex/stores +import pkg/codex/blocktype as bt +import pkg/codex/utils +import pkg/codex/chunker +import pkg/taskpools + +import ../asynctest +import ./helpers + +suite "Erasure encode/decode": + var store: BlockStore + var erasure: Erasure + var taskpool: Taskpool + let repoTmp = TempLevelDb.new() + let metaTmp = TempLevelDb.new() + + setup: + let + repoDs = repoTmp.newDb() + metaDs = metaTmp.newDb() + + store = RepoStore.new(repoDs, metaDs) + taskpool = Taskpool.new(num_threads = countProcessors()) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) + + teardown: + await repoTmp.destroyDb() + await metaTmp.destroyDb() + + test "Should encode/decode a file": + let blockSize = 16.KiBs + # for blockSize in @(1..<8).mapIt(it * 1024): + # echo $blockSize + + let + file = open("test_file.bin") + chunker = FileChunker.new(file = file, chunkSize = blockSize) + + + let + k = 20.Natural + m = 10.Natural + + let manifest = await storeDataGetManifest(store, chunker) + + let encoded = (await erasure.encode(manifest, k, m)).tryGet() + + let decoded = (await erasure.decode(encoded)).tryGet() + + check: + decoded.treeCid == manifest.treeCid + decoded.treeCid == encoded.originalTreeCid + decoded.blocksCount == encoded.originalBlocksCount + diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index 179047f3..22d01edf 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -69,7 +69,24 @@ suite "Test Prover": circomBackend = CircomCompat.init(r1cs, wasm) prover = Prover.new(store, circomBackend, samples) challenge = 1234567.toF.toBytes.toArray32 - (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet + + proc someFut(): Future[void] {.async.} = + echo "before 100 millis" + await sleepAsync(100.millis) + echo "after 100 millis" + + asyncSpawn(someFut()) + + echo "started proving" + let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet + echo "finished proving" + + await sleepAsync(10.millis) + echo "after additional 10 millis" + await sleepAsync(300.millis) + echo "after additional 300 millis" check: (await prover.verify(proof, inputs)).tryGet == true + +