Deterministic trigger of bug
This commit is contained in:
parent
8f73636a24
commit
6f4ee0e735
|
@ -9,6 +9,7 @@
|
|||
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/macros
|
||||
|
||||
import pkg/taskpools
|
||||
import pkg/taskpools/flowvars
|
||||
|
@ -23,6 +24,30 @@ import ./backend
|
|||
import ../errors
|
||||
import ../logutils
|
||||
|
||||
#nim-taskpool imports
|
||||
|
||||
import
|
||||
system/ansi_c,
|
||||
std/[random, cpuinfo, atomics, macros],
|
||||
pkg/taskpools/channels_spsc_single,
|
||||
pkg/taskpools/chase_lev_deques,
|
||||
pkg/taskpools/event_notifiers,
|
||||
pkg/taskpools/primitives/[barriers, allocs],
|
||||
pkg/taskpools/instrumentation/[contracts, loggers],
|
||||
pkg/taskpools/sparsesets,
|
||||
pkg/taskpools/flowvars,
|
||||
pkg/taskpools/ast_utils
|
||||
|
||||
|
||||
when (NimMajor,NimMinor,NimPatch) >= (1,6,0):
|
||||
import std/[isolation, tasks]
|
||||
export isolation
|
||||
else:
|
||||
import pkg/taskpools/shims_pre_1_6/tasks
|
||||
|
||||
import
|
||||
std/[cpuinfo, atomics, macros]
|
||||
|
||||
logScope:
|
||||
topics = "codex asyncerasure"
|
||||
|
||||
|
@ -56,10 +81,52 @@ type
|
|||
|
||||
proc dumpOf(prefix: string, bytes: seq[seq[byte]]): void =
|
||||
for i in 0..<bytes.len:
|
||||
# if bytes[i].isNil:
|
||||
# echo "bytes " & $i $ " is nil"
|
||||
if bytes[i].len > 0:
|
||||
io2.writeFile(prefix & $i, bytes[i]).tryGet()
|
||||
|
||||
proc hashOf(bytes: ref seq[seq[byte]]): string =
|
||||
template tempHashOf(bytes: untyped): untyped =
|
||||
echo "len is " & $len(bytes)
|
||||
var totalLen = 0
|
||||
for i in 0..<len(bytes):
|
||||
totalLen = totalLen + bytes[i].len
|
||||
|
||||
var buf = newSeq[byte]()
|
||||
|
||||
buf.setLen(totalLen)
|
||||
|
||||
var offset = 0
|
||||
for i in 0..<len(bytes):
|
||||
if bytes[i].len > 0:
|
||||
echo "pointer " & $i & " " & (unsafeAddr bytes[i][0]).repr
|
||||
copyMem(addr buf[offset], unsafeAddr bytes[i][0], bytes[i].len)
|
||||
offset = offset + bytes[i].len
|
||||
|
||||
let mhash = MultiHash.digest("sha2-256", buf)
|
||||
mhash.get().hex
|
||||
|
||||
|
||||
proc hashOf(bytes: seq[seq[byte]]): string =
|
||||
var totalLen = 0
|
||||
for i in 0..<len(bytes):
|
||||
totalLen = totalLen + bytes[i].len
|
||||
|
||||
var buf = newSeq[byte]()
|
||||
|
||||
buf.setLen(totalLen)
|
||||
|
||||
var offset = 0
|
||||
for i in 0..<len(bytes):
|
||||
if bytes[i].len > 0:
|
||||
copyMem(addr buf[offset], unsafeAddr bytes[i][0], bytes[i].len)
|
||||
offset = offset + bytes[i].len
|
||||
|
||||
let mhash = MultiHash.digest("sha2-256", buf)
|
||||
return mhash.get().hex
|
||||
|
||||
|
||||
proc hashOfRef(bytes: ref seq[seq[byte]]): string =
|
||||
var totalLen = 0
|
||||
for i in 0..<len(bytes[]):
|
||||
totalLen = totalLen + bytes[i].len
|
||||
|
@ -88,9 +155,9 @@ proc unsafeHashOf(bytes: seq[pointer], lens: seq[int]): string =
|
|||
|
||||
var offset = 0
|
||||
for i in 0..<lens.len:
|
||||
echo "pointer " & $i & " " & bytes[i].repr
|
||||
let l = lens[i]
|
||||
if l > 0:
|
||||
echo "pointer " & $i & " " & bytes[i].repr
|
||||
copyMem(addr buf[offset], bytes[i], l)
|
||||
offset = offset + l
|
||||
|
||||
|
@ -128,6 +195,8 @@ proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
|
|||
|
||||
proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[byte]], debug: bool): DecodeTaskResult =
|
||||
|
||||
echo "starting task code"
|
||||
|
||||
if debug:
|
||||
dumpOf("thread_data_", odata)
|
||||
dumpOf("thread_parity", oparity)
|
||||
|
@ -143,6 +212,10 @@ proc decodeTask(args: DecodeTaskArgs, odata: seq[seq[byte]], oparity: seq[seq[by
|
|||
|
||||
var ptrsParity: seq[pointer]
|
||||
for i in 0..<oparity.len:
|
||||
# if (unsafeAddr oparity[i]).isNil:
|
||||
# echo "oparity is Nil " & $i
|
||||
# ptrsParity.add(unsafeAddr oparity)
|
||||
# else:
|
||||
if oparity[i].len > 0:
|
||||
ptrsParity.add(unsafeAddr oparity[i][0])
|
||||
else:
|
||||
|
@ -199,12 +272,101 @@ proc proxySpawnDecodeTask(
|
|||
data: ref seq[seq[byte]],
|
||||
parity: ref seq[seq[byte]]
|
||||
): Flowvar[DecodeTaskResult] =
|
||||
let h = hashOf(data)
|
||||
echo "proxy hash of data " & h
|
||||
# let h = hashOfRef(data)
|
||||
# echo "proxy hash of data " & h
|
||||
|
||||
let debug = h == "12209C9675C6D0F65E90554E4251EAA8B4F1DE46E8178FD885B98A607F127C64C5C3"
|
||||
let h1 = tempHashOf(data[])
|
||||
echo "proxy hash of data " & h1
|
||||
|
||||
tp.spawn decodeTask(args, data[], parity[], debug)
|
||||
let ph1 = tempHashOf(parity[])
|
||||
echo "proxy hash of parity " & ph1
|
||||
|
||||
let debug = h1 == "12208A6C662044230A1760A4E1EE77D5D6A4C9176BA8E04F15C532FBB3D06D42F0D3"
|
||||
|
||||
|
||||
let fut = newFlowVar(typeof(DecodeTaskResult))
|
||||
proc taskpool_decodeTask(args: DecodeTaskArgs; odata: seq[seq[byte]];
|
||||
oparity: seq[seq[byte]]; debug: bool;
|
||||
fut: Flowvar[DecodeTaskResult]) {.nimcall.} =
|
||||
let resgensym115 = decodeTask(args, odata, oparity, debug)
|
||||
readyWith(fut, resgensym115)
|
||||
|
||||
type
|
||||
ScratchObj_11005855178 = object
|
||||
args: DecodeTaskArgs
|
||||
odata: seq[seq[byte]]
|
||||
oparity: seq[seq[byte]]
|
||||
debug: bool
|
||||
fut: Flowvar[EncodeTaskResult]
|
||||
|
||||
let scratch_11005855162 = cast[ptr ScratchObj_11005855178](c_calloc(
|
||||
csize_t(1), csize_t(64)))
|
||||
if isNil(scratch_11005855162):
|
||||
raise
|
||||
(ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
|
||||
block:
|
||||
var isoTemp_11005855168 = isolate(args)
|
||||
scratch_11005855162.args = extract(isoTemp_11005855168)
|
||||
var isoTemp_11005855170: Isolated[seq[seq[byte]]] = isolate(data[])
|
||||
|
||||
let h2 = tempHashOf(isoTemp_11005855170.value)
|
||||
echo "proxy hash of isolated data " & h2
|
||||
|
||||
scratch_11005855162.odata = extract(isoTemp_11005855170)
|
||||
|
||||
let h3 = tempHashOf(scratch_11005855162.odata)
|
||||
echo "proxy hash of moved data " & h3
|
||||
|
||||
|
||||
|
||||
var isoTemp_11005855172 = isolate(parity[])
|
||||
|
||||
let ph2 = tempHashOf(isoTemp_11005855172.value)
|
||||
echo "proxy hash of isolated parity " & ph2
|
||||
|
||||
scratch_11005855162.oparity = extract(isoTemp_11005855172)
|
||||
|
||||
GC_fullCollect()
|
||||
|
||||
# let ph3 = tempHashOf(scratch_11005855162.oparity)
|
||||
# echo "proxy hash of moved parity " & ph3
|
||||
|
||||
# let h22 = tempHashOf(isoTemp_11005855170.value)
|
||||
# echo "proxy hash of isolated data 2 " & h22
|
||||
|
||||
# let h32 = tempHashOf(scratch_11005855162.odata)
|
||||
# echo "proxy hash of moved data 2 " & h32
|
||||
|
||||
var isoTemp_11005855174 = isolate(debug)
|
||||
scratch_11005855162.debug = extract(isoTemp_11005855174)
|
||||
var isoTemp_11005855176 = isolate(fut)
|
||||
scratch_11005855162.fut = extract(isoTemp_11005855176)
|
||||
proc taskpool_decodeTask_11005855179(argsgensym120: pointer) {.gcsafe,
|
||||
nimcall.} =
|
||||
let objTemp_11005855167 = cast[ptr ScratchObj_11005855178](argsgensym120)
|
||||
let args_11005855169 = objTemp_11005855167.args
|
||||
let odata_11005855171 = objTemp_11005855167.odata
|
||||
let oparity_11005855173 = objTemp_11005855167.oparity
|
||||
let debug_11005855175 = objTemp_11005855167.debug
|
||||
let fut_11005855177 = objTemp_11005855167.fut
|
||||
taskpool_decodeTask(args_11005855169, odata_11005855171, oparity_11005855173,
|
||||
debug_11005855175, fut_11005855177)
|
||||
|
||||
proc destroyScratch_11005855180(argsgensym120: pointer) {.gcsafe, nimcall.} =
|
||||
let obj_11005855181 = cast[ptr ScratchObj_11005855178](argsgensym120)
|
||||
`=destroy`(obj_11005855181[])
|
||||
|
||||
let task = Task(callback: taskpool_decodeTask_11005855179, args: scratch_11005855162,
|
||||
destroy: destroyScratch_11005855180)
|
||||
|
||||
let taskNode = new(TaskNode, taskpools.workerContext.currentTask, task)
|
||||
schedule(taskpools.workerContext, taskNode)
|
||||
fut
|
||||
|
||||
# expandMacros:
|
||||
# tp.spawn decodeTask(args, data[], parity[], debug)
|
||||
|
||||
# tp.spawn decodeTask(args, data[], parity[], debug)
|
||||
# let res = DecodeTaskResult.newFlowVar
|
||||
|
||||
# res.readyWith(decodeTask(args, data[], parity[], debug))
|
||||
|
@ -271,8 +433,8 @@ proc asyncDecode*(
|
|||
without signal =? ThreadSignalPtr.new().mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
echo "orig hash of data " & hashOf(data)
|
||||
# echo "hash of parity " & hashOf(parity)
|
||||
echo "orig hash of data " & hashOfRef(data)
|
||||
# echo "hash of parity " & hashOfRef(parity)
|
||||
|
||||
try:
|
||||
let
|
||||
|
@ -293,7 +455,7 @@ proc asyncDecode*(
|
|||
recovered[i] = newSeq[byte](blockSize)
|
||||
copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)
|
||||
|
||||
# echo "orig hash of recovered " & hashOf(recovered)
|
||||
# echo "orig hash of recovered " & hashOfRef(recovered)
|
||||
|
||||
var ptrs: seq[pointer]
|
||||
|
||||
|
@ -303,7 +465,7 @@ proc asyncDecode*(
|
|||
|
||||
# echo "unsafe hash of recovered" & unsafeHashOf(ptrs, recovered[].mapIt(it.len))
|
||||
|
||||
# echo "orig hash of parity " & hashOf(parity)
|
||||
# echo "orig hash of parity " & hashOfRef(parity)
|
||||
|
||||
deallocShared(res.value.data)
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ suite "Erasure encode/decode":
|
|||
# for blockSize in @(1..<8).mapIt(it * 1024):
|
||||
# echo $blockSize
|
||||
|
||||
|
||||
let
|
||||
file = open("test_file.bin")
|
||||
chunker = FileChunker.new(file = file, chunkSize = blockSize)
|
||||
|
|
Loading…
Reference in New Issue