Block and parity dump

This commit is contained in:
Tomasz Bekas 2024-07-03 11:33:17 +02:00 committed by Tomasz Bekas
parent 67facb4b2a
commit 8f73636a24
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
6 changed files with 364 additions and 6 deletions

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import std/sequtils import std/sequtils
import std/sugar
import pkg/taskpools import pkg/taskpools
import pkg/taskpools/flowvars import pkg/taskpools/flowvars
@ -15,6 +16,9 @@ import pkg/chronos
import pkg/chronos/threadsync import pkg/chronos/threadsync
import pkg/questionable/results import pkg/questionable/results
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/stew/io2
import ./backend import ./backend
import ../errors import ../errors
import ../logutils import ../logutils
@ -50,6 +54,49 @@ type
EncodeTaskResult = Result[SharedArrayHolder[byte], cstring] EncodeTaskResult = Result[SharedArrayHolder[byte], cstring]
DecodeTaskResult = Result[SharedArrayHolder[byte], cstring] DecodeTaskResult = Result[SharedArrayHolder[byte], cstring]
proc dumpOf(prefix: string, bytes: seq[seq[byte]]): void =
for i in 0..<bytes.len:
if bytes[i].len > 0:
io2.writeFile(prefix & $i, bytes[i]).tryGet()
proc hashOf(bytes: ref seq[seq[byte]]): string =
var totalLen = 0
for i in 0..<len(bytes[]):
totalLen = totalLen + bytes[i].len
var buf = newSeq[byte]()
buf.setLen(totalLen)
var offset = 0
for i in 0..<len(bytes[]):
if bytes[i].len > 0:
copyMem(addr buf[offset], addr bytes[i][0], bytes[i].len)
offset = offset + bytes[i].len
let mhash = MultiHash.digest("sha2-256", buf)
return mhash.get().hex
proc unsafeHashOf(bytes: seq[pointer], lens: seq[int]): string =
var totalLen = 0
for l in lens:
totalLen = totalLen + l
var buf = newSeq[byte]()
buf.setLen(totalLen)
var offset = 0
for i in 0..<lens.len:
echo "pointer " & $i & " " & bytes[i].repr
let l = lens[i]
if l > 0:
copyMem(addr buf[offset], bytes[i], l)
offset = offset + l
let mhash = MultiHash.digest("sha2-256", buf)
return mhash.get().hex
proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
var var
data = data.unsafeAddr data = data.unsafeAddr
@ -79,12 +126,42 @@ proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
if err =? args.signal.fireSync().mapFailure.errorOption(): if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg error "Error firing signal", msg = err.msg
proc decodeTask(args: DecodeTaskArgs, data: seq[seq[byte]], parity: seq[seq[byte]]): DecodeTaskResult = proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[byte]], debug: bool): DecodeTaskResult =
if debug:
dumpOf("thread_data_", odata)
dumpOf("thread_parity", oparity)
# if debugFlag:
# io2.writeFile("original_block_" & $idx, blk.data).tryGet()
var ptrsData: seq[pointer]
for i in 0..<odata.len:
if odata[i].len > 0:
ptrsData.add(unsafeAddr odata[i][0])
else:
ptrsData.add(unsafeAddr odata)
var ptrsParity: seq[pointer]
for i in 0..<oparity.len:
if oparity[i].len > 0:
ptrsParity.add(unsafeAddr oparity[i][0])
else:
ptrsParity.add(unsafeAddr oparity)
echo "bef unsafe hash of data " & unsafeHashOf(ptrsData, odata.mapIt(it.len))
echo "bef unsafe hash of parity " & unsafeHashOf(ptrsParity, oparity.mapIt(it.len))
var
data = odata.unsafeAddr
parity = oparity.unsafeAddr
var var
data = data.unsafeAddr
parity = parity.unsafeAddr
recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize)) recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize))
var ptrs: seq[pointer]
for i in 0..<recovered.len:
ptrs.add(unsafeAddr recovered[i][0])
try: try:
let res = args.backend[].decode(data[], parity[], recovered) let res = args.backend[].decode(data[], parity[], recovered)
@ -122,7 +199,17 @@ proc proxySpawnDecodeTask(
data: ref seq[seq[byte]], data: ref seq[seq[byte]],
parity: ref seq[seq[byte]] parity: ref seq[seq[byte]]
): Flowvar[DecodeTaskResult] = ): Flowvar[DecodeTaskResult] =
tp.spawn decodeTask(args, data[], parity[]) let h = hashOf(data)
echo "proxy hash of data " & h
let debug = h == "12209C9675C6D0F65E90554E4251EAA8B4F1DE46E8178FD885B98A607F127C64C5C3"
tp.spawn decodeTask(args, data[], parity[], debug)
# let res = DecodeTaskResult.newFlowVar
# res.readyWith(decodeTask(args, data[], parity[], debug))
# return res
proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
await wait(signal) await wait(signal)
@ -184,12 +271,17 @@ proc asyncDecode*(
without signal =? ThreadSignalPtr.new().mapFailure, err: without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err) return failure(err)
echo "orig hash of data " & hashOf(data)
# echo "hash of parity " & hashOf(parity)
try: try:
let let
ecK = data[].len ecK = data[].len
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK) args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
handle = proxySpawnDecodeTask(tp, args, data, parity) handle = proxySpawnDecodeTask(tp, args, data, parity)
# GC_fullCollect()
without res =? await awaitResult(signal, handle), err: without res =? await awaitResult(signal, handle), err:
return failure(err) return failure(err)
@ -201,6 +293,18 @@ proc asyncDecode*(
recovered[i] = newSeq[byte](blockSize) recovered[i] = newSeq[byte](blockSize)
copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize) copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)
# echo "orig hash of recovered " & hashOf(recovered)
var ptrs: seq[pointer]
for i in 0..<recovered[].len:
ptrs.add(unsafeAddr recovered[i][0])
# echo "unsafe hash of recovered" & unsafeHashOf(ptrs, recovered[].mapIt(it.len))
# echo "orig hash of parity " & hashOf(parity)
deallocShared(res.value.data) deallocShared(res.value.data)
return success(recovered) return success(recovered)
@ -209,3 +313,56 @@ proc asyncDecode*(
finally: finally:
if err =? signal.close().mapFailure.errorOption(): if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg error "Error closing signal", msg = $err.msg
proc syncDecode*(
tp: Taskpool,
backend: DecoderBackend,
data, parity: ref seq[seq[byte]],
blockSize: int
): Future[?!ref seq[seq[byte]]] {.async.} =
let
ecK = data[].len
var recovered = newSeqWith[seq[byte]](ecK, newSeq[byte](blockSize))
backend.decode(data[], parity[], recovered)
var recoveredRet = seq[seq[byte]].new()
recoveredRet[].setLen(ecK)
for i in 0..<recoveredRet[].len:
recoveredRet[i] = newSeq[byte](blockSize)
copyMem(addr recoveredRet[i][0], addr recovered[i][0], blockSize)
return success(recoveredRet)
# without signal =? ThreadSignalPtr.new().mapFailure, err:
# return failure(err)
# try:
# let
# ecK = data[].len
# args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
# handle = proxySpawnDecodeTask(tp, args, data, parity)
# without res =? await awaitResult(signal, handle), err:
# return failure(err)
# if res.isOk:
# var recovered = seq[seq[byte]].new()
# recovered[].setLen(ecK)
# for i in 0..<recovered[].len:
# recovered[i] = newSeq[byte](blockSize)
# copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)
# deallocShared(res.value.data)
# return success(recovered)
# else:
# return failure($res.error)
# finally:
# if err =? signal.close().mapFailure.errorOption():
# error "Error closing signal", msg = $err.msg

View File

@ -31,6 +31,8 @@ import ../errors
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/stew/io2
import ./backend import ./backend
import ./asyncbackend import ./asyncbackend
@ -107,6 +109,50 @@ proc getPendingBlocks(
## Get pending blocks iterator ## Get pending blocks iterator
## ##
var
indiciesIter = Iter[int].new(indicies)
pendingBlocks = mapAsync[int, (?!bt.Block, int)](indiciesIter, (i: int) =>
self.store.getBlock(
BlockAddress.init(manifest.treeCid, i)
).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
)
pendingBlocks
proc getPendingBlocks2(
self: Erasure,
manifest: Manifest,
indicies: seq[int]): AsyncIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
let shift = indicies[0]
let newIndicies = @[0, 203, 196, 189, 182, 175, 168, 161, 154, 147, 140, 133, 126, 119, 112, 105, 98, 91, 84, 77].mapIt(it + shift)
var
indiciesIter = Iter[int].new(newIndicies)
# indiciesIter = Iter[int].new(indicies.filterIt((it mod 3) != 2))
# indiciesIter = Iter[int].new(@(@[indicies[29]] & indicies[5..<25]))
pendingBlocks = mapAsync[int, (?!bt.Block, int)](indiciesIter, (i: int) =>
self.store.getBlock(
BlockAddress.init(manifest.treeCid, i)
).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
)
pendingBlocks
proc getPendingBlocks3(
self: Erasure,
manifest: Manifest,
indicies: seq[int]): AsyncIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
var var
# request blocks from the store # request blocks from the store
pendingBlocks = indicies.map( (i: int) => pendingBlocks = indicies.map( (i: int) =>
@ -190,6 +236,8 @@ proc prepareDecodingData(
## `emptyBlock` - the empty block to be used for padding ## `emptyBlock` - the empty block to be used for padding
## ##
var recIndicies = newSeq[int]()
let let
strategy = encoded.protectedStrategy.init( strategy = encoded.protectedStrategy.init(
firstIndex = 0, firstIndex = 0,
@ -197,7 +245,7 @@ proc prepareDecodingData(
iterations = encoded.steps iterations = encoded.steps
) )
indicies = toSeq(strategy.getIndicies(step)) indicies = toSeq(strategy.getIndicies(step))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies) pendingBlocksIter = self.getPendingBlocks2(encoded, indicies)
var var
dataPieces = 0 dataPieces = 0
@ -214,6 +262,8 @@ proc prepareDecodingData(
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue continue
recIndicies.add(idx)
let let
pos = indexToPos(encoded.steps, idx, step) pos = indexToPos(encoded.steps, idx, step)
@ -236,6 +286,13 @@ proc prepareDecodingData(
resolved.inc resolved.inc
let recCids = collect:
for i in recIndicies:
cids[i]
without recTree =? CodexTree.init(recCids), err:
return failure(err)
return success (dataPieces.Natural, parityPieces.Natural) return success (dataPieces.Natural, parityPieces.Natural)
proc init*( proc init*(
@ -317,6 +374,7 @@ proc encodeData(
trace "Adding parity block", cid = blk.cid, idx trace "Adding parity block", cid = blk.cid, idx
cids[idx] = blk.cid cids[idx] = blk.cid
io2.writeFile("parity_" & $idx, blk.data).get()
if isErr (await self.store.putBlock(blk)): if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!") return failure("Unable to store block!")
@ -372,6 +430,47 @@ proc encode*(
return success encodedManifest return success encodedManifest
proc hashOf(bytes: ref seq[seq[byte]]): string =
var totalLen = 0
for i in 0..<len(bytes[]):
totalLen = totalLen + bytes[i].len
var buf = newSeq[byte]()
buf.setLen(totalLen)
var offset = 0
for i in 0..<len(bytes[]):
if bytes[i].len > 0:
copyMem(addr buf[offset], addr bytes[i][0], bytes[i].len)
offset = offset + bytes[i].len
let mhash = MultiHash.digest("sha2-256", buf).mapFailure()
return mhash.get().hex
# without mh =? mhash, err:
# return "error " & err.msg
# return mh.hex
# proc unsafeHashOf(bytes: seq[pointer], lens: seq[int]): string =
# var totalLen = 0
# for l in lens:
# totalLen = totalLen + l
# var buf = newSeq[byte]()
# buf.setLen(totalLen)
# var offset = 0
# for l in lens:
# if l > 0:
# copyMem(addr buf[offset], bytes[i], l)
# offset = offset + l
# let mhash = MultiHash.digest("sha2-256", buf)
# return $mhash
proc decode*( proc decode*(
self: Erasure, self: Erasure,
encoded: Manifest): Future[?!Manifest] {.async.} = encoded: Manifest): Future[?!Manifest] {.async.} =
@ -418,6 +517,10 @@ proc decode*(
trace "Error decoding data", err = err.msg trace "Error decoding data", err = err.msg
return failure(err) return failure(err)
echo "hash of recovered " & hashOf(recovered)
# GC_fullCollect()
for i in 0..<encoded.ecK: for i in 0..<encoded.ecK:
let idx = i * encoded.steps + step let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty: if data[i].len <= 0 and not cids[idx].isEmpty:

9
generate-mb.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Example - generating 64 MiB file
# ./generate-mb.sh my_file.bin 64
fname=$1
size=$2
dd if=/dev/urandom of="$fname" bs=1m count="$size"

View File

@ -11,6 +11,8 @@ import pkg/codex/blockexchange
import pkg/codex/rng import pkg/codex/rng
import pkg/codex/utils import pkg/codex/utils
import pkg/stew/io2
import ./helpers/nodeutils import ./helpers/nodeutils
import ./helpers/randomchunker import ./helpers/randomchunker
import ./helpers/mockchunker import ./helpers/mockchunker
@ -84,6 +86,8 @@ proc makeWantList*(
proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest] {.async.} = proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest] {.async.} =
var cids = newSeq[Cid]() var cids = newSeq[Cid]()
var i = 0
while ( while (
let chunk = await chunker.getBytes(); let chunk = await chunker.getBytes();
chunk.len > 0): chunk.len > 0):
@ -92,6 +96,10 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest
cids.add(blk.cid) cids.add(blk.cid)
(await store.putBlock(blk)).tryGet() (await store.putBlock(blk)).tryGet()
io2.writeFile("block_" & $i, blk.data)
i.inc
let let
tree = CodexTree.init(cids).tryGet() tree = CodexTree.init(cids).tryGet()
treeCid = tree.rootCid.tryGet() treeCid = tree.rootCid.tryGet()

View File

@ -0,0 +1,64 @@
import std/sequtils
import std/sugar
import std/cpuinfo
import pkg/chronos
import pkg/datastore
import pkg/questionable/results
import pkg/codex/erasure
import pkg/codex/manifest
import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/utils
import pkg/codex/chunker
import pkg/taskpools
import ../asynctest
import ./helpers
suite "Erasure encode/decode":
var store: BlockStore
var erasure: Erasure
var taskpool: Taskpool
let repoTmp = TempLevelDb.new()
let metaTmp = TempLevelDb.new()
setup:
let
repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb()
store = RepoStore.new(repoDs, metaDs)
taskpool = Taskpool.new(num_threads = countProcessors())
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
teardown:
await repoTmp.destroyDb()
await metaTmp.destroyDb()
test "Should encode/decode a file":
let blockSize = 16.KiBs
# for blockSize in @(1..<8).mapIt(it * 1024):
# echo $blockSize
let
file = open("test_file.bin")
chunker = FileChunker.new(file = file, chunkSize = blockSize)
let
k = 20.Natural
m = 10.Natural
let manifest = await storeDataGetManifest(store, chunker)
let encoded = (await erasure.encode(manifest, k, m)).tryGet()
let decoded = (await erasure.decode(encoded)).tryGet()
check:
decoded.treeCid == manifest.treeCid
decoded.treeCid == encoded.originalTreeCid
decoded.blocksCount == encoded.originalBlocksCount

View File

@ -69,7 +69,24 @@ suite "Test Prover":
circomBackend = CircomCompat.init(r1cs, wasm) circomBackend = CircomCompat.init(r1cs, wasm)
prover = Prover.new(store, circomBackend, samples) prover = Prover.new(store, circomBackend, samples)
challenge = 1234567.toF.toBytes.toArray32 challenge = 1234567.toF.toBytes.toArray32
(inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet
proc someFut(): Future[void] {.async.} =
echo "before 100 millis"
await sleepAsync(100.millis)
echo "after 100 millis"
asyncSpawn(someFut())
echo "started proving"
let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet
echo "finished proving"
await sleepAsync(10.millis)
echo "after additional 10 millis"
await sleepAsync(300.millis)
echo "after additional 300 millis"
check: check:
(await prover.verify(proof, inputs)).tryGet == true (await prover.verify(proof, inputs)).tryGet == true