mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-08 00:13:08 +00:00
use uniqueptr for safe memory managment
This commit is contained in:
parent
93b2df10b7
commit
7c2662d7fe
@ -25,6 +25,8 @@ import ../../blocktype
|
|||||||
|
|
||||||
from ../../utils/digest import digestBytes
|
from ../../utils/digest import digestBytes
|
||||||
|
|
||||||
|
import ../../utils/uniqueptr
|
||||||
|
|
||||||
import ../merkletree
|
import ../merkletree
|
||||||
|
|
||||||
export merkletree
|
export merkletree
|
||||||
@ -208,9 +210,6 @@ proc init*(
|
|||||||
if not task.success.load():
|
if not task.success.load():
|
||||||
return failure("merkle tree task failed")
|
return failure("merkle tree task failed")
|
||||||
|
|
||||||
# defer:
|
|
||||||
# task.layers = default(Isolated[seq[seq[ByteHash]]])
|
|
||||||
|
|
||||||
tree.layers = extractValue(task.layers)
|
tree.layers = extractValue(task.layers)
|
||||||
|
|
||||||
success tree
|
success tree
|
||||||
|
|||||||
@ -16,68 +16,7 @@ import pkg/taskpools
|
|||||||
import pkg/chronos/threadsync
|
import pkg/chronos/threadsync
|
||||||
|
|
||||||
import ../errors
|
import ../errors
|
||||||
|
import ../utils/uniqueptr
|
||||||
type UniqueSeq*[T] = object
|
|
||||||
## A unique pointer to a seq[seq[T]] in shared memory
|
|
||||||
## Can only be moved, not copied
|
|
||||||
data: ptr seq[seq[T]]
|
|
||||||
|
|
||||||
proc newUniqueSeq*[T](data: sink Isolated[seq[seq[T]]]): UniqueSeq[T] =
|
|
||||||
## Creates a new unique sequence in shared memory
|
|
||||||
## The memory is automatically freed when the object is destroyed
|
|
||||||
result.data = cast[ptr seq[seq[T]]](allocShared0(sizeof(seq[seq[T]])))
|
|
||||||
|
|
||||||
result.data[] = extract(data)
|
|
||||||
|
|
||||||
proc `=destroy`*[T](p: var UniqueSeq[T]) =
|
|
||||||
## Destructor for UniqueSeq
|
|
||||||
if p.data != nil:
|
|
||||||
# Clear the sequence to release inner sequences
|
|
||||||
p.data[].setLen(0)
|
|
||||||
echo "destroying unique seq"
|
|
||||||
deallocShared(p.data)
|
|
||||||
p.data = nil
|
|
||||||
|
|
||||||
proc `=copy`*[T](
|
|
||||||
dest: var UniqueSeq[T], src: UniqueSeq[T]
|
|
||||||
) {.error: "UniqueSeq cannot be copied, only moved".}
|
|
||||||
|
|
||||||
proc `=sink`*[T](dest: var UniqueSeq[T], src: UniqueSeq[T]) =
|
|
||||||
## Move constructor for UniqueSeq
|
|
||||||
if dest.data != nil:
|
|
||||||
`=destroy`(dest)
|
|
||||||
dest.data = src.data
|
|
||||||
# We need to nil out the source data to prevent double-free
|
|
||||||
# This is handled by Nim's destructive move semantics
|
|
||||||
|
|
||||||
proc `[]`*[T](p: UniqueSeq[T]): lent seq[seq[T]] =
|
|
||||||
## Access the data (read-only)
|
|
||||||
if p.data == nil:
|
|
||||||
raise newException(NilAccessDefect, "accessing nil UniqueSeq")
|
|
||||||
p.data[]
|
|
||||||
|
|
||||||
proc `[]`*[T](p: var UniqueSeq[T]): var seq[seq[T]] =
|
|
||||||
## Access the data (mutable)
|
|
||||||
if p.data == nil:
|
|
||||||
raise newException(NilAccessDefect, "accessing nil UniqueSeq")
|
|
||||||
p.data[]
|
|
||||||
|
|
||||||
proc isNil*[T](p: UniqueSeq[T]): bool =
|
|
||||||
## Check if the UniqueSeq is nil
|
|
||||||
p.data == nil
|
|
||||||
|
|
||||||
proc extractValue*[T](p: var UniqueSeq[T]): seq[seq[T]] =
|
|
||||||
## Extract the value from the UniqueSeq and release the memory
|
|
||||||
if p.data == nil:
|
|
||||||
raise newException(NilAccessDefect, "extracting from nil UniqueSeq")
|
|
||||||
|
|
||||||
# Move the value out
|
|
||||||
var isolated = isolate(p.data[])
|
|
||||||
result = extract(isolated)
|
|
||||||
|
|
||||||
# Free the shared memory
|
|
||||||
deallocShared(p.data)
|
|
||||||
p.data = nil
|
|
||||||
|
|
||||||
type
|
type
|
||||||
CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].}
|
CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].}
|
||||||
@ -98,7 +37,7 @@ type
|
|||||||
tree*: ptr MerkleTree[H, K]
|
tree*: ptr MerkleTree[H, K]
|
||||||
leaves*: seq[H]
|
leaves*: seq[H]
|
||||||
signal*: ThreadSignalPtr
|
signal*: ThreadSignalPtr
|
||||||
layers*: UniqueSeq[H]
|
layers*: UniquePtr[seq[seq[H]]]
|
||||||
success*: Atomic[bool]
|
success*: Atomic[bool]
|
||||||
|
|
||||||
func depth*[H, K](self: MerkleTree[H, K]): int =
|
func depth*[H, K](self: MerkleTree[H, K]): int =
|
||||||
@ -233,12 +172,11 @@ proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
|
|||||||
task[].success.store(false)
|
task[].success.store(false)
|
||||||
return
|
return
|
||||||
|
|
||||||
var l = res.get()
|
var layers = res.get()
|
||||||
var newOuterSeq = newSeq[seq[H]](l.len)
|
var newOuterSeq = newSeq[seq[H]](layers.len)
|
||||||
for i in 0 ..< l.len:
|
for i in 0 ..< layers.len:
|
||||||
var isoInner = isolate(l[i])
|
var isoInner = isolate(layers[i])
|
||||||
newOuterSeq[i] = extract(isoInner)
|
newOuterSeq[i] = extract(isoInner)
|
||||||
|
|
||||||
var isolatedLayers = isolate(newOuterSeq)
|
task[].layers = newUniquePtr(newOuterSeq)
|
||||||
task[].layers = newUniqueSeq(isolatedLayers)
|
|
||||||
task[].success.store(true)
|
task[].success.store(true)
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import pkg/constantine/platforms/abstractions
|
|||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
import ../utils
|
import ../utils
|
||||||
|
import ../utils/uniqueptr
|
||||||
import ../rng
|
import ../rng
|
||||||
|
|
||||||
import ./merkletree
|
import ./merkletree
|
||||||
@ -118,9 +119,6 @@ proc init*(
|
|||||||
if not task.success.load():
|
if not task.success.load():
|
||||||
return failure("merkle tree task failed")
|
return failure("merkle tree task failed")
|
||||||
|
|
||||||
# defer:
|
|
||||||
# task.layers = default(Isolated[seq[seq[Poseidon2Hash]]])
|
|
||||||
|
|
||||||
tree.layers = extractValue(task.layers)
|
tree.layers = extractValue(task.layers)
|
||||||
|
|
||||||
success tree
|
success tree
|
||||||
|
|||||||
@ -423,7 +423,7 @@ proc nattedAddress*(
|
|||||||
it.remapAddr(ip = newIP, port = tcp)
|
it.remapAddr(ip = newIP, port = tcp)
|
||||||
else:
|
else:
|
||||||
# NAT mapping failed - use original address
|
# NAT mapping failed - use original address
|
||||||
echo "Failed to get external IP, using original address", it
|
error "Failed to get external IP, using original address", it
|
||||||
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort))
|
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort))
|
||||||
it
|
it
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -15,6 +15,7 @@ import pkg/stew/byteutils
|
|||||||
import pkg/taskpools
|
import pkg/taskpools
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/chronos/threadsync
|
import pkg/chronos/threadsync
|
||||||
|
import ./uniqueptr
|
||||||
|
|
||||||
import ../merkletree
|
import ../merkletree
|
||||||
|
|
||||||
@ -23,7 +24,7 @@ type DigestTask* = object
|
|||||||
bytes: seq[byte]
|
bytes: seq[byte]
|
||||||
chunkSize: int
|
chunkSize: int
|
||||||
success: Atomic[bool]
|
success: Atomic[bool]
|
||||||
digest: ptr Poseidon2Hash
|
digest: UniquePtr[Poseidon2Hash]
|
||||||
|
|
||||||
export DigestTask
|
export DigestTask
|
||||||
|
|
||||||
@ -85,8 +86,7 @@ proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} =
|
|||||||
task[].success.store(false)
|
task[].success.store(false)
|
||||||
return
|
return
|
||||||
|
|
||||||
var isolatedDigest = isolate(res.get())
|
task[].digest = newUniquePtr(res.get())
|
||||||
task[].digest[] = extract(isolatedDigest)
|
|
||||||
task[].success.store(true)
|
task[].success.store(true)
|
||||||
|
|
||||||
proc digest*(
|
proc digest*(
|
||||||
@ -100,12 +100,7 @@ proc digest*(
|
|||||||
doAssert tp.numThreads > 1,
|
doAssert tp.numThreads > 1,
|
||||||
"Must have at least one separate thread or signal will never be fired"
|
"Must have at least one separate thread or signal will never be fired"
|
||||||
|
|
||||||
var task = DigestTask(
|
var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize)
|
||||||
signal: signal,
|
|
||||||
bytes: bytes,
|
|
||||||
chunkSize: chunkSize,
|
|
||||||
digest: cast[ptr Poseidon2Hash](allocShared(sizeof(Poseidon2Hash))),
|
|
||||||
)
|
|
||||||
|
|
||||||
tp.spawn digestWorker(tp, addr task)
|
tp.spawn digestWorker(tp, addr task)
|
||||||
|
|
||||||
@ -119,11 +114,7 @@ proc digest*(
|
|||||||
if not task.success.load():
|
if not task.success.load():
|
||||||
return failure("digest task failed")
|
return failure("digest task failed")
|
||||||
|
|
||||||
var isolatedDigest = isolate(task.digest[])
|
success extractValue(task.digest)
|
||||||
var digest = extract(isolatedDigest)
|
|
||||||
defer:
|
|
||||||
deallocShared(task.digest)
|
|
||||||
success digest
|
|
||||||
|
|
||||||
func digestMhash*(
|
func digestMhash*(
|
||||||
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
|
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
|
||||||
|
|||||||
58
codex/utils/uniqueptr.nim
Normal file
58
codex/utils/uniqueptr.nim
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import std/isolation
|
||||||
|
type UniquePtr*[T] = object
|
||||||
|
## A unique pointer to a seq[seq[T]] in shared memory
|
||||||
|
## Can only be moved, not copied
|
||||||
|
data: ptr T
|
||||||
|
|
||||||
|
proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] =
|
||||||
|
## Creates a new unique sequence in shared memory
|
||||||
|
## The memory is automatically freed when the object is destroyed
|
||||||
|
result.data = cast[ptr T](allocShared0(sizeof(T)))
|
||||||
|
result.data[] = extract(data)
|
||||||
|
|
||||||
|
template newUniquePtr*[T](data: T): UniquePtr[T] =
|
||||||
|
newUniquePtr(isolate(data))
|
||||||
|
|
||||||
|
proc `=destroy`*[T](p: var UniquePtr[T]) =
|
||||||
|
## Destructor for UniquePtr
|
||||||
|
if p.data != nil:
|
||||||
|
deallocShared(p.data)
|
||||||
|
p.data = nil
|
||||||
|
|
||||||
|
proc `=copy`*[T](
|
||||||
|
dest: var UniquePtr[T], src: UniquePtr[T]
|
||||||
|
) {.error: "UniquePtr cannot be copied, only moved".}
|
||||||
|
|
||||||
|
proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) =
|
||||||
|
if dest.data != nil:
|
||||||
|
`=destroy`(dest)
|
||||||
|
dest.data = src.data
|
||||||
|
# We need to nil out the source data to prevent double-free
|
||||||
|
# This is handled by Nim's destructive move semantics
|
||||||
|
|
||||||
|
proc `[]`*[T](p: UniquePtr[T]): lent T =
|
||||||
|
## Access the data (read-only)
|
||||||
|
if p.data == nil:
|
||||||
|
raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||||
|
p.data[]
|
||||||
|
|
||||||
|
# proc `[]`*[T](p: var UniquePtr[T]): var T =
|
||||||
|
# ## Access the data (mutable)
|
||||||
|
# if p.data == nil:
|
||||||
|
# raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||||
|
# p.data[]
|
||||||
|
|
||||||
|
proc isNil*[T](p: UniquePtr[T]): bool =
|
||||||
|
## Check if the UniquePtr is nil
|
||||||
|
p.data == nil
|
||||||
|
|
||||||
|
proc extractValue*[T](p: var UniquePtr[T]): T =
|
||||||
|
## Extract the value from the UniquePtr and release the memory
|
||||||
|
if p.data == nil:
|
||||||
|
raise newException(NilAccessDefect, "extracting from nil UniquePtr")
|
||||||
|
# Move the value out
|
||||||
|
var isolated = isolate(p.data[])
|
||||||
|
result = extract(isolated)
|
||||||
|
# Free the shared memory
|
||||||
|
deallocShared(p.data)
|
||||||
|
p.data = nil
|
||||||
@ -63,11 +63,9 @@ suite "Test Poseidon2Tree":
|
|||||||
tree == fromNodes
|
tree == fromNodes
|
||||||
|
|
||||||
test "Build poseidon2 tree from poseidon2 leaves asynchronously":
|
test "Build poseidon2 tree from poseidon2 leaves asynchronously":
|
||||||
echo "Build poseidon2 tree from poseidon2 leaves asynchronously"
|
|
||||||
var tp = Taskpool.new()
|
var tp = Taskpool.new()
|
||||||
defer:
|
defer:
|
||||||
tp.shutdown()
|
tp.shutdown()
|
||||||
echo "@@@@@"
|
|
||||||
|
|
||||||
let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet()
|
let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet()
|
||||||
check:
|
check:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user