mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-05 06:53:06 +00:00
use uniqueptr for sharing data across threads
This commit is contained in:
parent
00a0381ba2
commit
63fee7e90c
@ -32,6 +32,7 @@ import ../utils/asynciter
|
||||
import ../indexingstrategy
|
||||
import ../errors
|
||||
import ../utils/arrayutils
|
||||
import ../utils/uniqueptr
|
||||
|
||||
import pkg/stew/byteutils
|
||||
|
||||
@ -98,7 +99,7 @@ type
|
||||
success: Atomic[bool]
|
||||
erasure: ptr Erasure
|
||||
blocks: seq[seq[byte]]
|
||||
parity: Isolated[seq[seq[byte]]]
|
||||
parity: UniquePtr[seq[seq[byte]]]
|
||||
blockSize, parityLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
@ -107,7 +108,7 @@ type
|
||||
erasure: ptr Erasure
|
||||
blocks: seq[seq[byte]]
|
||||
parity: seq[seq[byte]]
|
||||
recovered: Isolated[seq[seq[byte]]]
|
||||
recovered: UniquePtr[seq[seq[byte]]]
|
||||
blockSize, recoveredLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
@ -311,8 +312,12 @@ proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
|
||||
|
||||
task[].success.store(false)
|
||||
else:
|
||||
var isolatedParity = isolate(parity)
|
||||
task[].parity = move isolatedParity
|
||||
var isolatedSeq = newSeq[seq[byte]](task[].parityLen)
|
||||
for i in 0 ..< task[].parityLen:
|
||||
var innerSeq = isolate(parity[i])
|
||||
isolatedSeq[i] = extract(innerSeq)
|
||||
|
||||
task[].parity = newUniquePtr(isolatedSeq)
|
||||
task[].success.store(true)
|
||||
|
||||
proc asyncEncode*(
|
||||
@ -349,12 +354,7 @@ proc asyncEncode*(
|
||||
if not task.success.load():
|
||||
return failure("Leopard encoding task failed")
|
||||
|
||||
defer:
|
||||
task.parity = default(Isolated[seq[seq[byte]]])
|
||||
|
||||
var parity = task.parity.extract
|
||||
|
||||
success parity
|
||||
success extractValue(task.parity)
|
||||
|
||||
proc encodeData(
|
||||
self: ErasureRef, manifest: Manifest, params: EncodingParams
|
||||
@ -474,8 +474,12 @@ proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
|
||||
warn "Error from leopard decoder backend!", error = $res.error
|
||||
task[].success.store(false)
|
||||
else:
|
||||
var isolatedRecovered = isolate(recovered)
|
||||
task[].recovered = move isolatedRecovered
|
||||
var isolatedSeq = newSeq[seq[byte]](task[].blocks.len)
|
||||
for i in 0 ..< task[].blocks.len:
|
||||
var innerSeq = isolate(recovered[i])
|
||||
isolatedSeq[i] = extract(innerSeq)
|
||||
|
||||
task[].recovered = newUniquePtr(isolatedSeq)
|
||||
task[].success.store(true)
|
||||
|
||||
proc asyncDecode*(
|
||||
@ -509,15 +513,10 @@ proc asyncDecode*(
|
||||
|
||||
return failure(err)
|
||||
|
||||
defer:
|
||||
task.recovered = default(Isolated[seq[seq[byte]]])
|
||||
|
||||
if not task.success.load():
|
||||
return failure("Leopard decoding task failed")
|
||||
|
||||
var recovered = task.recovered.extract
|
||||
|
||||
success(recovered)
|
||||
success extractValue(task.recovered)
|
||||
|
||||
proc decodeInternal(
|
||||
self: ErasureRef, encoded: Manifest
|
||||
|
||||
61
codex/utils/uniqueptr.nim
Normal file
61
codex/utils/uniqueptr.nim
Normal file
@ -0,0 +1,61 @@
|
||||
import std/isolation
|
||||
type UniquePtr*[T] = object
|
||||
## A unique pointer to a seq[seq[T]] in shared memory
|
||||
## Can only be moved, not copied
|
||||
data: ptr T
|
||||
|
||||
template newUniquePtr*[T](data: T): UniquePtr[T] =
|
||||
newUniquePtr(isolate(data))
|
||||
|
||||
proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] =
|
||||
## Creates a new unique sequence in shared memory
|
||||
## The memory is automatically freed when the object is destroyed
|
||||
result.data = cast[ptr T](allocShared0(sizeof(T)))
|
||||
|
||||
result.data[] = extract(data)
|
||||
|
||||
proc `=destroy`*[T](p: var UniquePtr[T]) =
|
||||
## Destructor for UniquePtr
|
||||
if p.data != nil:
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
|
||||
proc `=copy`*[T](
|
||||
dest: var UniquePtr[T], src: UniquePtr[T]
|
||||
) {.error: "UniquePtr cannot be copied, only moved".}
|
||||
|
||||
proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) =
|
||||
if dest.data != nil:
|
||||
`=destroy`(dest)
|
||||
dest.data = src.data
|
||||
# We need to nil out the source data to prevent double-free
|
||||
# This is handled by Nim's destructive move semantics
|
||||
|
||||
proc `[]`*[T](p: UniquePtr[T]): lent T =
|
||||
## Access the data (read-only)
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
p.data[]
|
||||
|
||||
# proc `[]`*[T](p: var UniquePtr[T]): var T =
|
||||
# ## Access the data (mutable)
|
||||
# if p.data == nil:
|
||||
# raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
# p.data[]
|
||||
|
||||
proc isNil*[T](p: UniquePtr[T]): bool =
|
||||
## Check if the UniquePtr is nil
|
||||
p.data == nil
|
||||
|
||||
proc extractValue*[T](p: var UniquePtr[T]): T =
|
||||
## Extract the value from the UniquePtr and release the memory
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "extracting from nil UniquePtr")
|
||||
|
||||
# Move the value out
|
||||
var isolated = isolate(p.data[])
|
||||
result = extract(isolated)
|
||||
|
||||
# Free the shared memory
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
2
vendor/nim-leopard
vendored
2
vendor/nim-leopard
vendored
@ -1 +1 @@
|
||||
Subproject commit aa5f8d7748a3299a3dbdc384f5e3fed330d30d51
|
||||
Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f
|
||||
Loading…
x
Reference in New Issue
Block a user