diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim index 8bcf9436..51db65e6 100644 --- a/codex/erasure/asyncbackend.nim +++ b/codex/erasure/asyncbackend.nim @@ -9,6 +9,7 @@ import std/sequtils import std/sugar +import std/macros import pkg/taskpools import pkg/taskpools/flowvars @@ -23,6 +24,30 @@ import ./backend import ../errors import ../logutils +#nim-taskpool imports + +import + system/ansi_c, + std/[random, cpuinfo, atomics, macros], + pkg/taskpools/channels_spsc_single, + pkg/taskpools/chase_lev_deques, + pkg/taskpools/event_notifiers, + pkg/taskpools/primitives/[barriers, allocs], + pkg/taskpools/instrumentation/[contracts, loggers], + pkg/taskpools/sparsesets, + pkg/taskpools/flowvars, + pkg/taskpools/ast_utils + + +when (NimMajor,NimMinor,NimPatch) >= (1,6,0): + import std/[isolation, tasks] + export isolation +else: + import pkg/taskpools/shims_pre_1_6/tasks + +import + std/[cpuinfo, atomics, macros] + logScope: topics = "codex asyncerasure" @@ -56,10 +81,52 @@ type proc dumpOf(prefix: string, bytes: seq[seq[byte]]): void = for i in 0.. 0: io2.writeFile(prefix & $i, bytes[i]).tryGet() -proc hashOf(bytes: ref seq[seq[byte]]): string = +template tempHashOf(bytes: untyped): untyped = + echo "len is " & $len(bytes) + var totalLen = 0 + for i in 0.. 0: + echo "pointer " & $i & " " & (unsafeAddr bytes[i][0]).repr + copyMem(addr buf[offset], unsafeAddr bytes[i][0], bytes[i].len) + offset = offset + bytes[i].len + + let mhash = MultiHash.digest("sha2-256", buf) + mhash.get().hex + + +proc hashOf(bytes: seq[seq[byte]]): string = + var totalLen = 0 + for i in 0.. 0: + copyMem(addr buf[offset], unsafeAddr bytes[i][0], bytes[i].len) + offset = offset + bytes[i].len + + let mhash = MultiHash.digest("sha2-256", buf) + return mhash.get().hex + + +proc hashOfRef(bytes: ref seq[seq[byte]]): string = var totalLen = 0 for i in 0.. 0: + echo "pointer " & $i & " " & bytes[i].repr copyMem(addr buf[offset], bytes[i], l) offset = offset + l @@ -128,6 +195,8 @@ proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[byte]], debug: bool): DecodeTaskResult = + echo "starting task code" + if debug: dumpOf("thread_data_", odata) dumpOf("thread_parity", oparity) @@ -143,6 +212,10 @@ proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[by var ptrsParity: seq[pointer] for i in 0.. 0: ptrsParity.add(unsafeAddr oparity[i][0]) else: @@ -199,12 +272,101 @@ proc proxySpawnDecodeTask( data: ref seq[seq[byte]], parity: ref seq[seq[byte]] ): Flowvar[DecodeTaskResult] = - let h = hashOf(data) - echo "proxy hash of data " & h + # let h = hashOfRef(data) + # echo "proxy hash of data " & h - let debug = h == "12209C9675C6D0F65E90554E4251EAA8B4F1DE46E8178FD885B98A607F127C64C5C3" + let h1 = tempHashOf(data[]) + echo "proxy hash of data " & h1 - tp.spawn decodeTask(args, data[], parity[], debug) + let ph1 = tempHashOf(parity[]) + echo "proxy hash of parity " & ph1 + + let debug = h1 == "12208A6C662044230A1760A4E1EE77D5D6A4C9176BA8E04F15C532FBB3D06D42F0D3" + + + let fut = newFlowVar(typeof(DecodeTaskResult)) + proc taskpool_decodeTask(args: DecodeTaskArgs; odata: seq[seq[byte]]; + oparity: seq[seq[byte]]; debug: bool; + fut: Flowvar[DecodeTaskResult]) {.nimcall.} = + let resgensym115 = decodeTask(args, odata, oparity, debug) + readyWith(fut, resgensym115) + + type + ScratchObj_11005855178 = object + args: DecodeTaskArgs + odata: seq[seq[byte]] + oparity: seq[seq[byte]] + debug: bool + fut: Flowvar[EncodeTaskResult] + + let scratch_11005855162 = cast[ptr ScratchObj_11005855178](c_calloc( + csize_t(1), csize_t(64))) + if isNil(scratch_11005855162): + raise + (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil) + block: + var isoTemp_11005855168 = isolate(args) + scratch_11005855162.args = extract(isoTemp_11005855168) + var isoTemp_11005855170: Isolated[seq[seq[byte]]] = isolate(data[]) + + let h2 = tempHashOf(isoTemp_11005855170.value) + echo "proxy hash of isolated data " & h2 + + scratch_11005855162.odata = extract(isoTemp_11005855170) + + let h3 = tempHashOf(scratch_11005855162.odata) + echo "proxy hash of moved data " & h3 + + + + var isoTemp_11005855172 = isolate(parity[]) + + let ph2 = tempHashOf(isoTemp_11005855172.value) + echo "proxy hash of isolated parity " & ph2 + + scratch_11005855162.oparity = extract(isoTemp_11005855172) + + GC_fullCollect() + + # let ph3 = tempHashOf(scratch_11005855162.oparity) + # echo "proxy hash of moved parity " & ph3 + + # let h22 = tempHashOf(isoTemp_11005855170.value) + # echo "proxy hash of isolated data 2 " & h22 + + # let h32 = tempHashOf(scratch_11005855162.odata) + # echo "proxy hash of moved data 2 " & h32 + + var isoTemp_11005855174 = isolate(debug) + scratch_11005855162.debug = extract(isoTemp_11005855174) + var isoTemp_11005855176 = isolate(fut) + scratch_11005855162.fut = extract(isoTemp_11005855176) + proc taskpool_decodeTask_11005855179(argsgensym120: pointer) {.gcsafe, + nimcall.} = + let objTemp_11005855167 = cast[ptr ScratchObj_11005855178](argsgensym120) + let args_11005855169 = objTemp_11005855167.args + let odata_11005855171 = objTemp_11005855167.odata + let oparity_11005855173 = objTemp_11005855167.oparity + let debug_11005855175 = objTemp_11005855167.debug + let fut_11005855177 = objTemp_11005855167.fut + taskpool_decodeTask(args_11005855169, odata_11005855171, oparity_11005855173, + debug_11005855175, fut_11005855177) + + proc destroyScratch_11005855180(argsgensym120: pointer) {.gcsafe, nimcall.} = + let obj_11005855181 = cast[ptr ScratchObj_11005855178](argsgensym120) + `=destroy`(obj_11005855181[]) + + let task = Task(callback: taskpool_decodeTask_11005855179, args: scratch_11005855162, + destroy: destroyScratch_11005855180) + + let taskNode = new(TaskNode, taskpools.workerContext.currentTask, task) + schedule(taskpools.workerContext, taskNode) + fut + + # expandMacros: + # tp.spawn decodeTask(args, data[], parity[], debug) + + # tp.spawn decodeTask(args, data[], parity[], debug) # let res = DecodeTaskResult.newFlowVar # res.readyWith(decodeTask(args, data[], parity[], debug)) @@ -271,8 +433,8 @@ proc asyncDecode*( without signal =? ThreadSignalPtr.new().mapFailure, err: return failure(err) - echo "orig hash of data " & hashOf(data) - # echo "hash of parity " & hashOf(parity) + echo "orig hash of data " & hashOfRef(data) + # echo "hash of parity " & hashOfRef(parity) try: let @@ -293,7 +455,7 @@ proc asyncDecode*( recovered[i] = newSeq[byte](blockSize) copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize) - # echo "orig hash of recovered " & hashOf(recovered) + # echo "orig hash of recovered " & hashOfRef(recovered) var ptrs: seq[pointer] @@ -303,7 +465,7 @@ proc asyncDecode*( # echo "unsafe hash of recovered" & unsafeHashOf(ptrs, recovered[].mapIt(it.len)) - # echo "orig hash of parity " & hashOf(parity) + # echo "orig hash of parity " & hashOfRef(parity) deallocShared(res.value.data) diff --git a/tests/codex/rndtesterasure.nim b/tests/codex/rndtesterasure.nim index e4abae2f..c555c459 100644 --- a/tests/codex/rndtesterasure.nim +++ b/tests/codex/rndtesterasure.nim @@ -41,6 +41,7 @@ suite "Erasure encode/decode": let blockSize = 16.KiBs # for blockSize in @(1..<8).mapIt(it * 1024): # echo $blockSize + let file = open("test_file.bin")