mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-02-08 07:43:12 +00:00
trying out manual futures
This commit is contained in:
parent
850a1b7b7a
commit
7f112566f8
@ -94,21 +94,6 @@ method put*(
|
||||
|
||||
block:
|
||||
put(ret, self.tds, key, data)
|
||||
echo "\n"
|
||||
echoed "wait put thr: ", $getThreadId()
|
||||
echo "\n"
|
||||
await sleepAsync(400.milliseconds)
|
||||
await wait(ret)
|
||||
echo "\n"
|
||||
await sleepAsync(400.milliseconds)
|
||||
|
||||
answer = ret.convert(void)
|
||||
block:
|
||||
echo "\n"
|
||||
await sleepAsync(400.milliseconds)
|
||||
echoed "PUT RELEASE"
|
||||
echo "PUT RELEASE"
|
||||
ret.release()
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
@ -69,7 +69,7 @@ proc decr*[T](x: var SharedPtr[T]) =
|
||||
else:
|
||||
echoed "SharedPtr: decr: ", x.container.pointer.repr, " cnt: ", x.container.cnt, " tp: ", $(typeof(T))
|
||||
|
||||
template release*[T](x: var SharedPtr[T]) =
|
||||
proc release*[T](x: var SharedPtr[T]) =
|
||||
echoed "SharedPtr: release: ", $(typeof(T))
|
||||
x.decr()
|
||||
x.container = nil
|
||||
|
||||
209
datastore/threads/then.nim
Normal file
209
datastore/threads/then.nim
Normal file
@ -0,0 +1,209 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
|
||||
## duplicate of codex/utils/then.nim
|
||||
|
||||
# Similar to JavaScript's Promise API, `.then` and `.catch` can be used to
|
||||
# handle results and errors of async `Futures` within a synchronous closure.
|
||||
# They can be used as an alternative to `asyncSpawn` which does not return a
|
||||
# value and will raise a `FutureDefect` if there are unhandled errors
|
||||
# encountered. Both `.then` and `.catch` act as callbacks that do not block the
|
||||
# synchronous closure's flow.
|
||||
|
||||
# `.then` is called when the `Future` is successfully completed and can be
|
||||
# chained as many times as desired, calling each `.then` callback in order. When
|
||||
# the `Future` returns `Result[T, ref CatchableError]` (or `?!T`), the value
|
||||
# called in the `.then` callback will be unpacked from the `Result` as a
|
||||
# convenience. In other words, for `Future[?!T]`, the `.then` callback will take
|
||||
# a single parameter `T`. See `tests/utils/testthen.nim` for more examples. To
|
||||
# allow for chaining, `.then` returns its future. If the future is already
|
||||
# complete, the `.then` callback will be executed immediately.
|
||||
|
||||
# `.catch` is called when the `Future` fails. In the case when the `Future`
|
||||
# returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called
|
||||
# if the `Result` contains an error. If the `Future` is already failed (or
|
||||
# `Future[?!T]` contains an error), the `.catch` callback will be executed
|
||||
# immediately.
|
||||
|
||||
# `.cancelled` is called when the `Future` is cancelled. If the `Future` is
|
||||
# already cancelled, the `.cancelled` callback will be executed immediately.
|
||||
|
||||
# More info on JavaScript's Promise API can be found at:
|
||||
# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
|
||||
|
||||
runnableExamples:
|
||||
proc asyncProc(): Future[int] {.async.} =
|
||||
await sleepAsync(1.millis)
|
||||
return 1
|
||||
|
||||
asyncProc()
|
||||
.then(proc(i: int) = echo "returned ", i)
|
||||
.catch(proc(e: ref CatchableError) = doAssert false, "will not be triggered")
|
||||
|
||||
# outputs "returned 1"
|
||||
|
||||
proc asyncProcWithError(): Future[int] {.async.} =
|
||||
await sleepAsync(1.millis)
|
||||
raise newException(ValueError, "some error")
|
||||
|
||||
asyncProcWithError()
|
||||
.then(proc(i: int) = doAssert false, "will not be triggered")
|
||||
.catch(proc(e: ref CatchableError) = echo "errored: ", e.msg)
|
||||
|
||||
# outputs "errored: some error"
|
||||
|
||||
type
|
||||
OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].}
|
||||
OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].}
|
||||
OnCancelled* = proc() {.gcsafe, upraises: [].}
|
||||
|
||||
proc ignoreError(err: ref CatchableError) = discard
|
||||
proc ignoreCancelled() = discard
|
||||
|
||||
template handleFinished(future: FutureBase,
|
||||
onError: OnError,
|
||||
onCancelled: OnCancelled) =
|
||||
|
||||
if not future.finished:
|
||||
return
|
||||
|
||||
if future.cancelled:
|
||||
onCancelled()
|
||||
return
|
||||
|
||||
if future.failed:
|
||||
onError(future.error)
|
||||
return
|
||||
|
||||
proc then*(future: Future[void], onSuccess: OnSuccess[void]): Future[void] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, ignoreCancelled)
|
||||
onSuccess()
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
|
||||
proc then*[T](future: Future[T], onSuccess: OnSuccess[T]): Future[T] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, ignoreCancelled)
|
||||
|
||||
if val =? future.read.catch:
|
||||
onSuccess(val)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
|
||||
proc then*[T](future: Future[?!T], onSuccess: OnSuccess[T]): Future[?!T] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, ignoreCancelled)
|
||||
|
||||
try:
|
||||
if val =? future.read:
|
||||
onSuccess(val)
|
||||
except CatchableError as e:
|
||||
ignoreError(e)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
|
||||
proc then*(future: Future[?!void], onSuccess: OnSuccess[void]): Future[?!void] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, ignoreCancelled)
|
||||
|
||||
try:
|
||||
if future.read.isOk:
|
||||
onSuccess()
|
||||
except CatchableError as e:
|
||||
ignoreError(e)
|
||||
return
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
|
||||
proc catch*[T](future: Future[T], onError: OnError) =
|
||||
|
||||
if future.isNil: return
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(onError, ignoreCancelled)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
|
||||
proc catch*[T](future: Future[?!T], onError: OnError) =
|
||||
|
||||
if future.isNil: return
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(onError, ignoreCancelled)
|
||||
|
||||
try:
|
||||
if err =? future.read.errorOption:
|
||||
onError(err)
|
||||
except CatchableError as e:
|
||||
onError(e)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
|
||||
proc cancelled*[T](future: Future[T], onCancelled: OnCancelled): Future[T] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, onCancelled)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
onCancelled()
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
|
||||
proc cancelled*[T](future: Future[?!T], onCancelled: OnCancelled): Future[?!T] =
|
||||
|
||||
proc cb(udata: pointer) =
|
||||
future.handleFinished(ignoreError, onCancelled)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
if not future.finished():
|
||||
future.removeCallback(cb)
|
||||
onCancelled()
|
||||
|
||||
future.addCallback(cb)
|
||||
future.cancelCallback = cancellation
|
||||
return future
|
||||
@ -122,6 +122,7 @@ proc get*(
|
||||
import std/os
|
||||
|
||||
proc putTask*(
|
||||
sig: SharedSignal,
|
||||
ret: TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
kb: KeyBuffer,
|
||||
@ -145,22 +146,52 @@ proc putTask*(
|
||||
else:
|
||||
ret.success()
|
||||
|
||||
discard ret.fireSync()
|
||||
discard sig.fireSync()
|
||||
# ret.release()
|
||||
echoed "putTask: FINISH\n"
|
||||
|
||||
import then
|
||||
|
||||
proc put*(
|
||||
ret: var TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
key: Key,
|
||||
data: seq[byte]
|
||||
) =
|
||||
echoed "put request args: ", $getThreadId()
|
||||
let bkey = KeyBuffer.new(key)
|
||||
let bval = DataBuffer.new(data)
|
||||
): Future[?!void] =
|
||||
|
||||
echoed "spawn put request: ", $getThreadId()
|
||||
tds[].tp.spawn putTask(ret, tds, bkey, bval)
|
||||
echoed "put request args: ", $getThreadId()
|
||||
|
||||
var putRes = newFuture[?!void]("threadbackend.put(tds, key, data)")
|
||||
let sigFut = SharedSignal.new()
|
||||
|
||||
sigFut.
|
||||
then(proc (sig: SharedSignal) =
|
||||
echoed "got tresFut"
|
||||
let
|
||||
ret = newSharedPtr(ThreadResult[void])
|
||||
bkey = KeyBuffer.new(key)
|
||||
bval = DataBuffer.new(data)
|
||||
|
||||
echoed "spawn put request: ", $getThreadId()
|
||||
# this spawns the taskpool Task
|
||||
# but we can't wait on it directly - we use wait(ret[].sig)
|
||||
tds[].tp.spawn putTask(sig, ret, tds, bkey, bval)
|
||||
|
||||
wait(sig).
|
||||
then(proc () =
|
||||
var ret = ret
|
||||
let val = ret.convert(void)
|
||||
putRes.complete(val)
|
||||
).cancelled(proc() =
|
||||
discard
|
||||
).catch(proc(e: ref CatchableError) =
|
||||
doAssert false, "will not be triggered"
|
||||
)
|
||||
).catch(proc(e: ref CatchableError) =
|
||||
echoed "err tresFut"
|
||||
var res: ?!void
|
||||
res.err(e)
|
||||
putRes.complete(res)
|
||||
)
|
||||
|
||||
|
||||
proc deleteTask*(
|
||||
|
||||
112
results.rb
Normal file
112
results.rb
Normal file
@ -0,0 +1,112 @@
|
||||
[Suite] Share buffer test
|
||||
SharedPtr: alloc: 0x102508090 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025080f0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508150 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025081b0 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102498180"0x1025081b0" cnt: 0x102508190 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025081b0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102498330"0x102508150" cnt: 0x102508130 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508150 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024984e0"0x1025080f0" cnt: 0x1025080d0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025080f0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102498690"0x102508090" cnt: 0x102508070 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508090 -1 tp: DataBufferHolder
|
||||
[OK] creation
|
||||
SharedPtr: alloc: 0x102508210 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508270 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025082d0 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249a990"0x1025082d0" cnt: 0x1025082b0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025082d0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249ab40"0x102508270" cnt: 0x102508250 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508270 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249acf0"0x102508210" cnt: 0x1025081f0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508210 -1 tp: DataBufferHolder
|
||||
[OK] equality
|
||||
SharedPtr: alloc: 0x102508330 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508390 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025083f0 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102498240"0x1025083f0" cnt: 0x1025083d0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025083f0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024983f0"0x102508390" cnt: 0x102508370 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508390 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024985a0"0x102508330" cnt: 0x102508310 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508330 -1 tp: DataBufferHolder
|
||||
[OK] toString
|
||||
SharedPtr: alloc: 0x102508450 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025084b0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508510 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102493ab0"0x102508510" cnt: 0x1025084f0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508510 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102493cc0"0x1025084b0" cnt: 0x102508490 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025084b0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102493e40"0x102508450" cnt: 0x102508430 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508450 -1 tp: DataBufferHolder
|
||||
[OK] hash
|
||||
SharedPtr: alloc: 0x102508570 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025085d0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508630 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102499480"0x102508630" cnt: 0x102508610 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508630 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102499630"0x1025085d0" cnt: 0x1025085b0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025085d0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024997e0"0x102508570" cnt: 0x102508550 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508570 -1 tp: DataBufferHolder
|
||||
[OK] hashes differ
|
||||
SharedPtr: alloc: 0x102508690 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x1025086f0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508750 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249dd20"0x102508750" cnt: 0x102508730 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508750 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249df30"0x1025086f0" cnt: 0x1025086d0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025086f0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024931b0"0x102508690" cnt: 0x102508670 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508690 -1 tp: DataBufferHolder
|
||||
[OK] key conversion
|
||||
SharedPtr: alloc: 0x1025087b0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508810 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508870 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024975d0"0x102508870" cnt: 0x102508850 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508870 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102497780"0x102508810" cnt: 0x1025087f0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508810 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102497b70"0x1025087b0" cnt: 0x102508790 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025087b0 -1 tp: DataBufferHolder
|
||||
[OK] seq conversion
|
||||
SharedPtr: alloc: 0x1025088d0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508930 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508990 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x10249ff00"0x102508990" cnt: 0x102508970 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508990 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102498210"0x102508930" cnt: 0x102508910 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508930 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024981b0"0x1025088d0" cnt: 0x1025088b0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025088d0 -1 tp: DataBufferHolder
|
||||
[OK] seq conversion
|
||||
SharedPtr: alloc: 0x1025089f0 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508a50 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508ab0 tp: DataBufferHolder
|
||||
running
|
||||
thread1
|
||||
thread2
|
||||
SharedPtr: alloc: 0x102508b30 tp: DataBufferHolder
|
||||
SharedPtr: alloc: 0x102508b90 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1025a0150"0x102508b30" cnt: 0x102508b10 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508b30 -1 tp: DataBufferHolder
|
||||
thread1: sending: (val: (buf: ..., size: 11))
|
||||
mybytes2: (val: (buf: ..., size: 11))
|
||||
thread1: sent, left over: (val: (buf: ..., size: 11))
|
||||
thread2: receiving
|
||||
thread2: received: (val: (buf: ..., size: 11))
|
||||
SharedPtr: destroy: 0x102620120"0x102508b90" cnt: nil tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1025a0450"0x102508b90" cnt: 0x102508b70 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508b90 1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1025a0600"0x102508b90" cnt: 0x102508b10 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508b90 -2 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x1024936f0"0x102508ab0" cnt: 0x102508a90 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508ab0 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102493cf0"0x102508a50" cnt: 0x102508a30 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x102508a50 -1 tp: DataBufferHolder
|
||||
SharedPtr: destroy: 0x102493960"0x1025089f0" cnt: 0x1025089d0 tp: DataBufferHolder
|
||||
SharedPtr: decr: 0x1025089f0 -1 tp: DataBufferHolder
|
||||
[OK] basic thread test
|
||||
SharedPtr: destroy: 0x102493f00"0x102508b90" cnt: nil tp: DataBufferHolder
|
||||
Loading…
x
Reference in New Issue
Block a user