mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 16:43:10 +00:00
initial data buffer
This commit is contained in:
parent
0cde8aeb67
commit
81eb8034da
1
.gitignore
vendored
1
.gitignore
vendored
@ -9,3 +9,4 @@ coverage
|
|||||||
datastore.nims
|
datastore.nims
|
||||||
nimcache
|
nimcache
|
||||||
TODO
|
TODO
|
||||||
|
nim.cfg
|
||||||
|
|||||||
74
datastore/databuffer.nim
Normal file
74
datastore/databuffer.nim
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import std/os
|
||||||
|
import std/locks
|
||||||
|
import std/atomics
|
||||||
|
|
||||||
|
import events
|
||||||
|
|
||||||
|
|
||||||
|
type
|
||||||
|
DataBuffer* = object
|
||||||
|
cnt: ptr int
|
||||||
|
buf: ptr UncheckedArray[byte]
|
||||||
|
size: int
|
||||||
|
|
||||||
|
proc `$`*(data: DataBuffer): string =
|
||||||
|
if data.buf.isNil:
|
||||||
|
result = "nil"
|
||||||
|
else:
|
||||||
|
let sz = min(16, data.size)
|
||||||
|
result = newString(sz + 2)
|
||||||
|
copyMem(addr result[1], data.buf, sz)
|
||||||
|
result[0] = '<'
|
||||||
|
result[^1] = '>'
|
||||||
|
|
||||||
|
proc `=destroy`*(x: var DataBuffer) =
|
||||||
|
## copy pointer implementation
|
||||||
|
if x.buf != nil and x.cnt != nil:
|
||||||
|
let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE)
|
||||||
|
if res == 0:
|
||||||
|
when isMainModule:
|
||||||
|
echo "buffer: FREE: ", repr x.buf.pointer, " ", x.cnt[]
|
||||||
|
deallocShared(x.buf)
|
||||||
|
deallocShared(x.cnt)
|
||||||
|
else:
|
||||||
|
when isMainModule:
|
||||||
|
echo "buffer: decr: ", repr x.buf.pointer, " ", x.cnt[]
|
||||||
|
|
||||||
|
proc `=copy`*(a: var DataBuffer; b: DataBuffer) =
|
||||||
|
## copy pointer implementation
|
||||||
|
|
||||||
|
# do nothing for self-assignments:
|
||||||
|
if a.buf == b.buf: return
|
||||||
|
`=destroy`(a)
|
||||||
|
discard atomicAddFetch(b.cnt, 1, ATOMIC_RELAXED)
|
||||||
|
a.size = b.size
|
||||||
|
a.buf = b.buf
|
||||||
|
a.cnt = b.cnt
|
||||||
|
when isMainModule:
|
||||||
|
echo "buffer: Copy: repr: ", b.cnt[],
|
||||||
|
" ", repr a.buf.pointer,
|
||||||
|
" ", repr b.buf.pointer
|
||||||
|
|
||||||
|
proc incrAtomicCount*(a: DataBuffer) =
|
||||||
|
let res = atomicAddFetch(a.cnt, 1, ATOMIC_RELAXED)
|
||||||
|
proc getAtomicCount*(a: DataBuffer): int =
|
||||||
|
atomicLoad(a.cnt, addr result, ATOMIC_RELAXED)
|
||||||
|
|
||||||
|
proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer =
|
||||||
|
let cnt = cast[ptr int](allocShared0(sizeof(result.cnt)))
|
||||||
|
cnt[] = 1
|
||||||
|
DataBuffer(
|
||||||
|
cnt: cnt,
|
||||||
|
buf: cast[typeof(result.buf)](allocShared0(size)),
|
||||||
|
size: size,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer =
|
||||||
|
## allocate new buffer and copies indata from openArray
|
||||||
|
##
|
||||||
|
result = DataBuffer.new(data.len)
|
||||||
|
if data.len() > 0:
|
||||||
|
copyMem(result.buf, unsafeAddr data[0], data.len)
|
||||||
82
tests/datastore/testsharedbuffer.nim
Normal file
82
tests/datastore/testsharedbuffer.nim
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
import std/options
|
||||||
|
import std/sequtils
|
||||||
|
import std/algorithm
|
||||||
|
import std/locks
|
||||||
|
import std/os
|
||||||
|
|
||||||
|
import pkg/unittest2
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
include ../../datastore/databuffer
|
||||||
|
|
||||||
|
type
|
||||||
|
AtomicFreed* = ptr int
|
||||||
|
|
||||||
|
proc newFreedValue*(val = 0): ptr int =
|
||||||
|
result = cast[ptr int](alloc0(sizeof(int)))
|
||||||
|
result[] = val
|
||||||
|
|
||||||
|
proc getFreedValue*(x: ptr int): int =
|
||||||
|
atomicLoad(x, addr result, ATOMIC_ACQUIRE)
|
||||||
|
|
||||||
|
proc incrFreedValue*(x: ptr int): int =
|
||||||
|
atomicAddFetch(x, 1, ATOMIC_ACQUIRE)
|
||||||
|
|
||||||
|
proc decrFreedValue*(x: ptr int): int =
|
||||||
|
atomicSubFetch(x, 1, ATOMIC_ACQUIRE)
|
||||||
|
|
||||||
|
|
||||||
|
var
|
||||||
|
shareVal: DataBuffer
|
||||||
|
lock: Lock
|
||||||
|
cond: Cond
|
||||||
|
|
||||||
|
var threads: array[2,Thread[int]]
|
||||||
|
|
||||||
|
proc thread1(val: int) {.thread.} =
|
||||||
|
echo "thread1"
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
for i in 0..<val:
|
||||||
|
os.sleep(20)
|
||||||
|
var myBytes2 = DataBuffer.new()
|
||||||
|
var myBytes = DataBuffer.new(@"hello world")
|
||||||
|
myBytes2 = myBytes
|
||||||
|
|
||||||
|
echo "thread1: sending: ", myBytes, " cnt: ", myBytes.getAtomicCount()
|
||||||
|
echo "mybytes2: ", myBytes2, " cnt: ", myBytes2.getAtomicCount()
|
||||||
|
|
||||||
|
shareVal = myBytes
|
||||||
|
echo "thread1: sent, left over: ", $myBytes
|
||||||
|
lock.withLock:
|
||||||
|
signal(cond)
|
||||||
|
os.sleep(10)
|
||||||
|
|
||||||
|
proc thread2(val: int) {.thread.} =
|
||||||
|
echo "thread2"
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
for i in 0..<val:
|
||||||
|
lock.withLock:
|
||||||
|
wait(cond, lock)
|
||||||
|
echo "thread2: receiving "
|
||||||
|
let msg: DataBuffer = shareVal
|
||||||
|
echo "thread2: received: ", msg, " cnt: ", msg.getAtomicCount()
|
||||||
|
# os.sleep(100)
|
||||||
|
|
||||||
|
proc runBasicTest() =
|
||||||
|
echo "running"
|
||||||
|
|
||||||
|
lock.initLock()
|
||||||
|
cond.initCond()
|
||||||
|
|
||||||
|
let n = 1
|
||||||
|
createThread(threads[0], thread1, n)
|
||||||
|
createThread(threads[1], thread2, n)
|
||||||
|
|
||||||
|
joinThreads(threads)
|
||||||
|
|
||||||
|
suite "Share buffer test":
|
||||||
|
|
||||||
|
test "basic test":
|
||||||
|
runBasicTest()
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user