mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
fix segfault issues
This commit is contained in:
parent
3c39dad723
commit
2125371915
@ -191,7 +191,7 @@ proc init*(
|
||||
var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec)
|
||||
|
||||
var task =
|
||||
CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal)
|
||||
CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: leaves, signal: signal)
|
||||
|
||||
doAssert tp.numThreads > 1,
|
||||
"Must have at least one separate thread or signal will never be fired"
|
||||
@ -208,10 +208,10 @@ proc init*(
|
||||
if not task.success.load():
|
||||
return failure("merkle tree task failed")
|
||||
|
||||
defer:
|
||||
task.layers = default(Isolated[seq[seq[ByteHash]]])
|
||||
# defer:
|
||||
# task.layers = default(Isolated[seq[seq[ByteHash]]])
|
||||
|
||||
tree.layers = task.layers.extract
|
||||
tree.layers = extractValue(task.layers)
|
||||
|
||||
success tree
|
||||
|
||||
|
||||
@ -17,6 +17,68 @@ import pkg/chronos/threadsync
|
||||
|
||||
import ../errors
|
||||
|
||||
type UniqueSeq*[T] = object
|
||||
## A unique pointer to a seq[seq[T]] in shared memory
|
||||
## Can only be moved, not copied
|
||||
data: ptr seq[seq[T]]
|
||||
|
||||
proc newUniqueSeq*[T](data: sink Isolated[seq[seq[T]]]): UniqueSeq[T] =
|
||||
## Creates a new unique sequence in shared memory
|
||||
## The memory is automatically freed when the object is destroyed
|
||||
result.data = cast[ptr seq[seq[T]]](allocShared0(sizeof(seq[seq[T]])))
|
||||
|
||||
result.data[] = extract(data)
|
||||
|
||||
proc `=destroy`*[T](p: var UniqueSeq[T]) =
|
||||
## Destructor for UniqueSeq
|
||||
if p.data != nil:
|
||||
# Clear the sequence to release inner sequences
|
||||
p.data[].setLen(0)
|
||||
echo "destroying unique seq"
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
|
||||
proc `=copy`*[T](
|
||||
dest: var UniqueSeq[T], src: UniqueSeq[T]
|
||||
) {.error: "UniqueSeq cannot be copied, only moved".}
|
||||
|
||||
proc `=sink`*[T](dest: var UniqueSeq[T], src: UniqueSeq[T]) =
|
||||
## Move constructor for UniqueSeq
|
||||
if dest.data != nil:
|
||||
`=destroy`(dest)
|
||||
dest.data = src.data
|
||||
# We need to nil out the source data to prevent double-free
|
||||
# This is handled by Nim's destructive move semantics
|
||||
|
||||
proc `[]`*[T](p: UniqueSeq[T]): lent seq[seq[T]] =
|
||||
## Access the data (read-only)
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "accessing nil UniqueSeq")
|
||||
p.data[]
|
||||
|
||||
proc `[]`*[T](p: var UniqueSeq[T]): var seq[seq[T]] =
|
||||
## Access the data (mutable)
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "accessing nil UniqueSeq")
|
||||
p.data[]
|
||||
|
||||
proc isNil*[T](p: UniqueSeq[T]): bool =
|
||||
## Check if the UniqueSeq is nil
|
||||
p.data == nil
|
||||
|
||||
proc extractValue*[T](p: var UniqueSeq[T]): seq[seq[T]] =
|
||||
## Extract the value from the UniqueSeq and release the memory
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "extracting from nil UniqueSeq")
|
||||
|
||||
# Move the value out
|
||||
var isolated = isolate(p.data[])
|
||||
result = extract(isolated)
|
||||
|
||||
# Free the shared memory
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
|
||||
type
|
||||
CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].}
|
||||
|
||||
@ -36,7 +98,7 @@ type
|
||||
tree*: ptr MerkleTree[H, K]
|
||||
leaves*: seq[H]
|
||||
signal*: ThreadSignalPtr
|
||||
layers*: Isolated[seq[seq[H]]]
|
||||
layers*: UniqueSeq[H]
|
||||
success*: Atomic[bool]
|
||||
|
||||
func depth*[H, K](self: MerkleTree[H, K]): int =
|
||||
@ -171,6 +233,12 @@ proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
|
||||
task[].success.store(false)
|
||||
return
|
||||
|
||||
var isolatedLayers = isolate(res.get())
|
||||
task[].layers = isolatedLayers
|
||||
var l = res.get()
|
||||
var newOuterSeq = newSeq[seq[H]](l.len)
|
||||
for i in 0 ..< l.len:
|
||||
var isoInner = isolate(l[i])
|
||||
newOuterSeq[i] = extract(isoInner)
|
||||
|
||||
var isolatedLayers = isolate(newOuterSeq)
|
||||
task[].layers = newUniqueSeq(isolatedLayers)
|
||||
task[].success.store(true)
|
||||
|
||||
@ -118,10 +118,10 @@ proc init*(
|
||||
if not task.success.load():
|
||||
return failure("merkle tree task failed")
|
||||
|
||||
defer:
|
||||
task.layers = default(Isolated[seq[seq[Poseidon2Hash]]])
|
||||
# defer:
|
||||
# task.layers = default(Isolated[seq[seq[Poseidon2Hash]]])
|
||||
|
||||
tree.layers = task.layers.extract
|
||||
tree.layers = extractValue(task.layers)
|
||||
|
||||
success tree
|
||||
|
||||
|
||||
@ -191,11 +191,11 @@ proc getBlockDigest*[T, H](
|
||||
if blk.isEmpty:
|
||||
return self.emptyDigestTree.root
|
||||
|
||||
without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err:
|
||||
without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err:
|
||||
error "Failed to create digest for block", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success digest
|
||||
return success dg
|
||||
|
||||
proc getCellHashes*[T, H](
|
||||
self: SlotsBuilder[T, H], slotIndex: Natural
|
||||
|
||||
@ -23,7 +23,7 @@ type DigestTask* = object
|
||||
bytes: seq[byte]
|
||||
chunkSize: int
|
||||
success: Atomic[bool]
|
||||
digest: Isolated[Poseidon2Hash]
|
||||
digest: ptr Poseidon2Hash
|
||||
|
||||
export DigestTask
|
||||
|
||||
@ -85,7 +85,8 @@ proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} =
|
||||
task[].success.store(false)
|
||||
return
|
||||
|
||||
task[].digest = isolate(res.get())
|
||||
var isolatedDigest = isolate(res.get())
|
||||
task[].digest[] = extract(isolatedDigest)
|
||||
task[].success.store(true)
|
||||
|
||||
proc digest*(
|
||||
@ -93,14 +94,18 @@ proc digest*(
|
||||
): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} =
|
||||
without signal =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
|
||||
defer:
|
||||
signal.close().expect("closing once works")
|
||||
|
||||
doAssert tp.numThreads > 1,
|
||||
"Must have at least one separate thread or signal will never be fired"
|
||||
|
||||
var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize)
|
||||
var task = DigestTask(
|
||||
signal: signal,
|
||||
bytes: bytes,
|
||||
chunkSize: chunkSize,
|
||||
digest: cast[ptr Poseidon2Hash](allocShared(sizeof(Poseidon2Hash))),
|
||||
)
|
||||
|
||||
tp.spawn digestWorker(tp, addr task)
|
||||
|
||||
@ -114,11 +119,10 @@ proc digest*(
|
||||
if not task.success.load():
|
||||
return failure("digest task failed")
|
||||
|
||||
var isolatedDigest = isolate(task.digest[])
|
||||
var digest = extract(isolatedDigest)
|
||||
defer:
|
||||
task.digest = default(Isolated[Poseidon2Hash])
|
||||
|
||||
var digest = task.digest.extract
|
||||
|
||||
deallocShared(task.digest)
|
||||
success digest
|
||||
|
||||
func digestMhash*(
|
||||
|
||||
@ -178,12 +178,14 @@ asyncchecksuite "Test Node - Basic":
|
||||
check string.fromBytes(data) == testString
|
||||
|
||||
test "Setup purchase request":
|
||||
echo "Here the tedt"
|
||||
let
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool)
|
||||
manifest = await storeDataGetManifest(localStore, chunker)
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
|
||||
let
|
||||
builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
|
||||
@ -52,7 +52,6 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
)
|
||||
var
|
||||
manifest: Manifest
|
||||
taskPool: Taskpool
|
||||
builder: Poseidon2Builder
|
||||
verifiable: Manifest
|
||||
verifiableBlock: bt.Block
|
||||
@ -102,7 +101,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user