From 93b2df10b712b9b0afe08227af59ffc168dadcd9 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Wed, 9 Jul 2025 20:40:55 +0530 Subject: [PATCH] fix segfault issues --- codex/merkletree/codex/codex.nim | 8 ++-- codex/merkletree/merkletree.nim | 74 +++++++++++++++++++++++++++-- codex/merkletree/poseidon2.nim | 6 +-- codex/slots/builder/builder.nim | 4 +- codex/utils/poseidon2digest.nim | 20 ++++---- tests/codex/node/testnode.nim | 2 + tests/codex/node/testslotrepair.nim | 3 +- 7 files changed, 95 insertions(+), 22 deletions(-) diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index b0892dda..67e1507e 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -191,7 +191,7 @@ proc init*( var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) var task = - CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal) + CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: leaves, signal: signal) doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" @@ -208,10 +208,10 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - defer: - task.layers = default(Isolated[seq[seq[ByteHash]]]) + # defer: + # task.layers = default(Isolated[seq[seq[ByteHash]]]) - tree.layers = task.layers.extract + tree.layers = extractValue(task.layers) success tree diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index c973a7e1..bf9d4c92 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -17,6 +17,68 @@ import pkg/chronos/threadsync import ../errors +type UniqueSeq*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr seq[seq[T]] + +proc newUniqueSeq*[T](data: sink Isolated[seq[seq[T]]]): UniqueSeq[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr seq[seq[T]]](allocShared0(sizeof(seq[seq[T]]))) + + result.data[] = extract(data) + +proc `=destroy`*[T](p: var UniqueSeq[T]) = + ## Destructor for UniqueSeq + if p.data != nil: + # Clear the sequence to release inner sequences + p.data[].setLen(0) + echo "destroying unique seq" + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniqueSeq[T], src: UniqueSeq[T] +) {.error: "UniqueSeq cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniqueSeq[T], src: UniqueSeq[T]) = + ## Move constructor for UniqueSeq + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniqueSeq[T]): lent seq[seq[T]] = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniqueSeq") + p.data[] + +proc `[]`*[T](p: var UniqueSeq[T]): var seq[seq[T]] = + ## Access the data (mutable) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniqueSeq") + p.data[] + +proc isNil*[T](p: UniqueSeq[T]): bool = + ## Check if the UniqueSeq is nil + p.data == nil + +proc extractValue*[T](p: var UniqueSeq[T]): seq[seq[T]] = + ## Extract the value from the UniqueSeq and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniqueSeq") + + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + + # Free the shared memory + deallocShared(p.data) + p.data = nil + type CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].} @@ -36,7 +98,7 @@ type tree*: ptr MerkleTree[H, K] leaves*: seq[H] signal*: ThreadSignalPtr - layers*: Isolated[seq[seq[H]]] + layers*: UniqueSeq[H] success*: Atomic[bool] func depth*[H, K](self: MerkleTree[H, K]): int = @@ -171,6 +233,12 @@ proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = task[].success.store(false) return - var isolatedLayers = isolate(res.get()) - task[].layers = isolatedLayers + var l = res.get() + var newOuterSeq = newSeq[seq[H]](l.len) + for i in 0 ..< l.len: + var isoInner = isolate(l[i]) + newOuterSeq[i] = extract(isoInner) + + var isolatedLayers = isolate(newOuterSeq) + task[].layers = newUniqueSeq(isolatedLayers) task[].success.store(true) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 64f9bc01..0917276c 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -118,10 +118,10 @@ proc init*( if not task.success.load(): return failure("merkle tree task failed") - defer: - task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) + # defer: + # task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) - tree.layers = task.layers.extract + tree.layers = extractValue(task.layers) success tree diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 34c3ed9a..a2abb80b 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -191,11 +191,11 @@ proc getBlockDigest*[T, H]( if blk.isEmpty: return self.emptyDigestTree.root - without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: error "Failed to create digest for block", err = err.msg return failure(err) - return success digest + return success dg proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 7607aee2..7e7c37f2 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -23,7 +23,7 @@ type DigestTask* = object bytes: seq[byte] chunkSize: int success: Atomic[bool] - digest: Isolated[Poseidon2Hash] + digest: ptr Poseidon2Hash export DigestTask @@ -85,7 +85,8 @@ proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = task[].success.store(false) return - task[].digest = isolate(res.get()) + var isolatedDigest = isolate(res.get()) + task[].digest[] = extract(isolatedDigest) task[].success.store(true) proc digest*( @@ -93,14 +94,18 @@ proc digest*( ): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = without signal =? ThreadSignalPtr.new(): return failure("Unable to create thread signal") - defer: signal.close().expect("closing once works") doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + var task = DigestTask( + signal: signal, + bytes: bytes, + chunkSize: chunkSize, + digest: cast[ptr Poseidon2Hash](allocShared(sizeof(Poseidon2Hash))), + ) tp.spawn digestWorker(tp, addr task) @@ -114,11 +119,10 @@ proc digest*( if not task.success.load(): return failure("digest task failed") + var isolatedDigest = isolate(task.digest[]) + var digest = extract(isolatedDigest) defer: - task.digest = default(Isolated[Poseidon2Hash]) - - var digest = task.digest.extract - + deallocShared(task.digest) success digest func digestMhash*( diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 46e6df3a..d182927d 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -178,12 +178,14 @@ asyncchecksuite "Test Node - Basic": check string.fromBytes(data) == testString test "Setup purchase request": + echo "Here the tedt" let erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() + let builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index 3d588a6d..d074efca 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -52,7 +52,6 @@ asyncchecksuite "Test Node - Slot Repair": ) var manifest: Manifest - taskPool: Taskpool builder: Poseidon2Builder verifiable: Manifest verifiableBlock: bt.Block @@ -102,7 +101,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()