From 81eb8034da67b7e0dbfcc1e270614bf71f31e2ce Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 24 Aug 2023 15:19:52 -0700 Subject: [PATCH] initial data buffer --- .gitignore | 1 + datastore/databuffer.nim | 74 +++++++++++++++++++++++++ tests/datastore/testsharedbuffer.nim | 82 ++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 datastore/databuffer.nim create mode 100644 tests/datastore/testsharedbuffer.nim diff --git a/.gitignore b/.gitignore index 1a87a99..3806842 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ coverage datastore.nims nimcache TODO +nim.cfg diff --git a/datastore/databuffer.nim b/datastore/databuffer.nim new file mode 100644 index 0000000..14d7d21 --- /dev/null +++ b/datastore/databuffer.nim @@ -0,0 +1,74 @@ + + + +import std/os +import std/locks +import std/atomics + +import events + + +type + DataBuffer* = object + cnt: ptr int + buf: ptr UncheckedArray[byte] + size: int + +proc `$`*(data: DataBuffer): string = + if data.buf.isNil: + result = "nil" + else: + let sz = min(16, data.size) + result = newString(sz + 2) + copyMem(addr result[1], data.buf, sz) + result[0] = '<' + result[^1] = '>' + +proc `=destroy`*(x: var DataBuffer) = + ## copy pointer implementation + if x.buf != nil and x.cnt != nil: + let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE) + if res == 0: + when isMainModule: + echo "buffer: FREE: ", repr x.buf.pointer, " ", x.cnt[] + deallocShared(x.buf) + deallocShared(x.cnt) + else: + when isMainModule: + echo "buffer: decr: ", repr x.buf.pointer, " ", x.cnt[] + +proc `=copy`*(a: var DataBuffer; b: DataBuffer) = + ## copy pointer implementation + + # do nothing for self-assignments: + if a.buf == b.buf: return + `=destroy`(a) + discard atomicAddFetch(b.cnt, 1, ATOMIC_RELAXED) + a.size = b.size + a.buf = b.buf + a.cnt = b.cnt + when isMainModule: + echo "buffer: Copy: repr: ", b.cnt[], + " ", repr a.buf.pointer, + " ", repr b.buf.pointer + +proc incrAtomicCount*(a: DataBuffer) = + let res = atomicAddFetch(a.cnt, 1, ATOMIC_RELAXED) +proc getAtomicCount*(a: DataBuffer): int = + atomicLoad(a.cnt, addr result, ATOMIC_RELAXED) + +proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer = + let cnt = cast[ptr int](allocShared0(sizeof(result.cnt))) + cnt[] = 1 + DataBuffer( + cnt: cnt, + buf: cast[typeof(result.buf)](allocShared0(size)), + size: size, + ) + +proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer = + ## allocate new buffer and copies indata from openArray + ## + result = DataBuffer.new(data.len) + if data.len() > 0: + copyMem(result.buf, unsafeAddr data[0], data.len) diff --git a/tests/datastore/testsharedbuffer.nim b/tests/datastore/testsharedbuffer.nim new file mode 100644 index 0000000..499438a --- /dev/null +++ b/tests/datastore/testsharedbuffer.nim @@ -0,0 +1,82 @@ +import std/options +import std/sequtils +import std/algorithm +import std/locks +import std/os + +import pkg/unittest2 +import pkg/questionable +import pkg/questionable/results + +include ../../datastore/databuffer + +type + AtomicFreed* = ptr int + +proc newFreedValue*(val = 0): ptr int = + result = cast[ptr int](alloc0(sizeof(int))) + result[] = val + +proc getFreedValue*(x: ptr int): int = + atomicLoad(x, addr result, ATOMIC_ACQUIRE) + +proc incrFreedValue*(x: ptr int): int = + atomicAddFetch(x, 1, ATOMIC_ACQUIRE) + +proc decrFreedValue*(x: ptr int): int = + atomicSubFetch(x, 1, ATOMIC_ACQUIRE) + + +var + shareVal: DataBuffer + lock: Lock + cond: Cond + +var threads: array[2,Thread[int]] + +proc thread1(val: int) {.thread.} = + echo "thread1" + {.cast(gcsafe).}: + for i in 0..