From 7f112566f88333472ad22cafa88e0d3d1ae34857 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 14 Sep 2023 13:52:35 -0700 Subject: [PATCH] trying out manual futures --- datastore/threadproxyds.nim | 15 -- datastore/threads/sharedptr.nim | 2 +- datastore/threads/then.nim | 209 ++++++++++++++++++++++++++++ datastore/threads/threadbackend.nim | 47 +++++-- results.rb | 112 +++++++++++++++ 5 files changed, 361 insertions(+), 24 deletions(-) create mode 100644 datastore/threads/then.nim create mode 100644 results.rb diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 20fe110..139caa7 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -94,21 +94,6 @@ method put*( block: put(ret, self.tds, key, data) - echo "\n" - echoed "wait put thr: ", $getThreadId() - echo "\n" - await sleepAsync(400.milliseconds) - await wait(ret) - echo "\n" - await sleepAsync(400.milliseconds) - - answer = ret.convert(void) - block: - echo "\n" - await sleepAsync(400.milliseconds) - echoed "PUT RELEASE" - echo "PUT RELEASE" - ret.release() return answer diff --git a/datastore/threads/sharedptr.nim b/datastore/threads/sharedptr.nim index 859df5c..cede20c 100644 --- a/datastore/threads/sharedptr.nim +++ b/datastore/threads/sharedptr.nim @@ -69,7 +69,7 @@ proc decr*[T](x: var SharedPtr[T]) = else: echoed "SharedPtr: decr: ", x.container.pointer.repr, " cnt: ", x.container.cnt, " tp: ", $(typeof(T)) -template release*[T](x: var SharedPtr[T]) = +proc release*[T](x: var SharedPtr[T]) = echoed "SharedPtr: release: ", $(typeof(T)) x.decr() x.container = nil diff --git a/datastore/threads/then.nim b/datastore/threads/then.nim new file mode 100644 index 0000000..d98d461 --- /dev/null +++ b/datastore/threads/then.nim @@ -0,0 +1,209 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/upraises + +## duplicate of codex/utils/then.nim + +# Similar to JavaScript's Promise API, `.then` and `.catch` can be used to +# handle results and errors of async `Futures` within a synchronous closure. +# They can be used as an alternative to `asyncSpawn` which does not return a +# value and will raise a `FutureDefect` if there are unhandled errors +# encountered. Both `.then` and `.catch` act as callbacks that do not block the +# synchronous closure's flow. + +# `.then` is called when the `Future` is successfully completed and can be +# chained as many times as desired, calling each `.then` callback in order. When +# the `Future` returns `Result[T, ref CatchableError]` (or `?!T`), the value +# called in the `.then` callback will be unpacked from the `Result` as a +# convenience. In other words, for `Future[?!T]`, the `.then` callback will take +# a single parameter `T`. See `tests/utils/testthen.nim` for more examples. To +# allow for chaining, `.then` returns its future. If the future is already +# complete, the `.then` callback will be executed immediately. + +# `.catch` is called when the `Future` fails. In the case when the `Future` +# returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called +# if the `Result` contains an error. If the `Future` is already failed (or +# `Future[?!T]` contains an error), the `.catch` callback will be executed +# immediately. + +# `.cancelled` is called when the `Future` is cancelled. If the `Future` is +# already cancelled, the `.cancelled` callback will be executed immediately. + +# More info on JavaScript's Promise API can be found at: +# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise + +runnableExamples: + proc asyncProc(): Future[int] {.async.} = + await sleepAsync(1.millis) + return 1 + + asyncProc() + .then(proc(i: int) = echo "returned ", i) + .catch(proc(e: ref CatchableError) = doAssert false, "will not be triggered") + + # outputs "returned 1" + + proc asyncProcWithError(): Future[int] {.async.} = + await sleepAsync(1.millis) + raise newException(ValueError, "some error") + + asyncProcWithError() + .then(proc(i: int) = doAssert false, "will not be triggered") + .catch(proc(e: ref CatchableError) = echo "errored: ", e.msg) + + # outputs "errored: some error" + +type + OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].} + OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].} + OnCancelled* = proc() {.gcsafe, upraises: [].} + +proc ignoreError(err: ref CatchableError) = discard +proc ignoreCancelled() = discard + +template handleFinished(future: FutureBase, + onError: OnError, + onCancelled: OnCancelled) = + + if not future.finished: + return + + if future.cancelled: + onCancelled() + return + + if future.failed: + onError(future.error) + return + +proc then*(future: Future[void], onSuccess: OnSuccess[void]): Future[void] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, ignoreCancelled) + onSuccess() + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*[T](future: Future[T], onSuccess: OnSuccess[T]): Future[T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, ignoreCancelled) + + if val =? future.read.catch: + onSuccess(val) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*[T](future: Future[?!T], onSuccess: OnSuccess[T]): Future[?!T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, ignoreCancelled) + + try: + if val =? future.read: + onSuccess(val) + except CatchableError as e: + ignoreError(e) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc then*(future: Future[?!void], onSuccess: OnSuccess[void]): Future[?!void] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, ignoreCancelled) + + try: + if future.read.isOk: + onSuccess() + except CatchableError as e: + ignoreError(e) + return + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc catch*[T](future: Future[T], onError: OnError) = + + if future.isNil: return + + proc cb(udata: pointer) = + future.handleFinished(onError, ignoreCancelled) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + +proc catch*[T](future: Future[?!T], onError: OnError) = + + if future.isNil: return + + proc cb(udata: pointer) = + future.handleFinished(onError, ignoreCancelled) + + try: + if err =? future.read.errorOption: + onError(err) + except CatchableError as e: + onError(e) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + + future.addCallback(cb) + future.cancelCallback = cancellation + +proc cancelled*[T](future: Future[T], onCancelled: OnCancelled): Future[T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, onCancelled) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + onCancelled() + + future.addCallback(cb) + future.cancelCallback = cancellation + return future + +proc cancelled*[T](future: Future[?!T], onCancelled: OnCancelled): Future[?!T] = + + proc cb(udata: pointer) = + future.handleFinished(ignoreError, onCancelled) + + proc cancellation(udata: pointer) = + if not future.finished(): + future.removeCallback(cb) + onCancelled() + + future.addCallback(cb) + future.cancelCallback = cancellation + return future \ No newline at end of file diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index a33d610..0e69f2e 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -122,6 +122,7 @@ proc get*( import std/os proc putTask*( + sig: SharedSignal, ret: TResult[void], tds: ThreadDatastorePtr, kb: KeyBuffer, @@ -145,22 +146,52 @@ proc putTask*( else: ret.success() - discard ret.fireSync() + discard sig.fireSync() # ret.release() echoed "putTask: FINISH\n" +import then + proc put*( - ret: var TResult[void], tds: ThreadDatastorePtr, key: Key, data: seq[byte] -) = - echoed "put request args: ", $getThreadId() - let bkey = KeyBuffer.new(key) - let bval = DataBuffer.new(data) +): Future[?!void] = - echoed "spawn put request: ", $getThreadId() - tds[].tp.spawn putTask(ret, tds, bkey, bval) + echoed "put request args: ", $getThreadId() + + var putRes = newFuture[?!void]("threadbackend.put(tds, key, data)") + let sigFut = SharedSignal.new() + + sigFut. + then(proc (sig: SharedSignal) = + echoed "got tresFut" + let + ret = newSharedPtr(ThreadResult[void]) + bkey = KeyBuffer.new(key) + bval = DataBuffer.new(data) + + echoed "spawn put request: ", $getThreadId() + # this spawns the taskpool Task + # but we can't wait on it directly - we use wait(ret[].sig) + tds[].tp.spawn putTask(sig, ret, tds, bkey, bval) + + wait(sig). + then(proc () = + var ret = ret + let val = ret.convert(void) + putRes.complete(val) + ).cancelled(proc() = + discard + ).catch(proc(e: ref CatchableError) = + doAssert false, "will not be triggered" + ) + ).catch(proc(e: ref CatchableError) = + echoed "err tresFut" + var res: ?!void + res.err(e) + putRes.complete(res) + ) proc deleteTask*( diff --git a/results.rb b/results.rb new file mode 100644 index 0000000..c8fe3e2 --- /dev/null +++ b/results.rb @@ -0,0 +1,112 @@ +[Suite] Share buffer test +SharedPtr: alloc: 0x102508090 tp: DataBufferHolder +SharedPtr: alloc: 0x1025080f0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508150 tp: DataBufferHolder +SharedPtr: alloc: 0x1025081b0 tp: DataBufferHolder +SharedPtr: destroy: 0x102498180"0x1025081b0" cnt: 0x102508190 tp: DataBufferHolder +SharedPtr: decr: 0x1025081b0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102498330"0x102508150" cnt: 0x102508130 tp: DataBufferHolder +SharedPtr: decr: 0x102508150 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024984e0"0x1025080f0" cnt: 0x1025080d0 tp: DataBufferHolder +SharedPtr: decr: 0x1025080f0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102498690"0x102508090" cnt: 0x102508070 tp: DataBufferHolder +SharedPtr: decr: 0x102508090 -1 tp: DataBufferHolder + [OK] creation +SharedPtr: alloc: 0x102508210 tp: DataBufferHolder +SharedPtr: alloc: 0x102508270 tp: DataBufferHolder +SharedPtr: alloc: 0x1025082d0 tp: DataBufferHolder +SharedPtr: destroy: 0x10249a990"0x1025082d0" cnt: 0x1025082b0 tp: DataBufferHolder +SharedPtr: decr: 0x1025082d0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x10249ab40"0x102508270" cnt: 0x102508250 tp: DataBufferHolder +SharedPtr: decr: 0x102508270 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x10249acf0"0x102508210" cnt: 0x1025081f0 tp: DataBufferHolder +SharedPtr: decr: 0x102508210 -1 tp: DataBufferHolder + [OK] equality +SharedPtr: alloc: 0x102508330 tp: DataBufferHolder +SharedPtr: alloc: 0x102508390 tp: DataBufferHolder +SharedPtr: alloc: 0x1025083f0 tp: DataBufferHolder +SharedPtr: destroy: 0x102498240"0x1025083f0" cnt: 0x1025083d0 tp: DataBufferHolder +SharedPtr: decr: 0x1025083f0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024983f0"0x102508390" cnt: 0x102508370 tp: DataBufferHolder +SharedPtr: decr: 0x102508390 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024985a0"0x102508330" cnt: 0x102508310 tp: DataBufferHolder +SharedPtr: decr: 0x102508330 -1 tp: DataBufferHolder + [OK] toString +SharedPtr: alloc: 0x102508450 tp: DataBufferHolder +SharedPtr: alloc: 0x1025084b0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508510 tp: DataBufferHolder +SharedPtr: destroy: 0x102493ab0"0x102508510" cnt: 0x1025084f0 tp: DataBufferHolder +SharedPtr: decr: 0x102508510 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102493cc0"0x1025084b0" cnt: 0x102508490 tp: DataBufferHolder +SharedPtr: decr: 0x1025084b0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102493e40"0x102508450" cnt: 0x102508430 tp: DataBufferHolder +SharedPtr: decr: 0x102508450 -1 tp: DataBufferHolder + [OK] hash +SharedPtr: alloc: 0x102508570 tp: DataBufferHolder +SharedPtr: alloc: 0x1025085d0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508630 tp: DataBufferHolder +SharedPtr: destroy: 0x102499480"0x102508630" cnt: 0x102508610 tp: DataBufferHolder +SharedPtr: decr: 0x102508630 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102499630"0x1025085d0" cnt: 0x1025085b0 tp: DataBufferHolder +SharedPtr: decr: 0x1025085d0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024997e0"0x102508570" cnt: 0x102508550 tp: DataBufferHolder +SharedPtr: decr: 0x102508570 -1 tp: DataBufferHolder + [OK] hashes differ +SharedPtr: alloc: 0x102508690 tp: DataBufferHolder +SharedPtr: alloc: 0x1025086f0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508750 tp: DataBufferHolder +SharedPtr: destroy: 0x10249dd20"0x102508750" cnt: 0x102508730 tp: DataBufferHolder +SharedPtr: decr: 0x102508750 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x10249df30"0x1025086f0" cnt: 0x1025086d0 tp: DataBufferHolder +SharedPtr: decr: 0x1025086f0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024931b0"0x102508690" cnt: 0x102508670 tp: DataBufferHolder +SharedPtr: decr: 0x102508690 -1 tp: DataBufferHolder + [OK] key conversion +SharedPtr: alloc: 0x1025087b0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508810 tp: DataBufferHolder +SharedPtr: alloc: 0x102508870 tp: DataBufferHolder +SharedPtr: destroy: 0x1024975d0"0x102508870" cnt: 0x102508850 tp: DataBufferHolder +SharedPtr: decr: 0x102508870 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102497780"0x102508810" cnt: 0x1025087f0 tp: DataBufferHolder +SharedPtr: decr: 0x102508810 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102497b70"0x1025087b0" cnt: 0x102508790 tp: DataBufferHolder +SharedPtr: decr: 0x1025087b0 -1 tp: DataBufferHolder + [OK] seq conversion +SharedPtr: alloc: 0x1025088d0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508930 tp: DataBufferHolder +SharedPtr: alloc: 0x102508990 tp: DataBufferHolder +SharedPtr: destroy: 0x10249ff00"0x102508990" cnt: 0x102508970 tp: DataBufferHolder +SharedPtr: decr: 0x102508990 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102498210"0x102508930" cnt: 0x102508910 tp: DataBufferHolder +SharedPtr: decr: 0x102508930 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x1024981b0"0x1025088d0" cnt: 0x1025088b0 tp: DataBufferHolder +SharedPtr: decr: 0x1025088d0 -1 tp: DataBufferHolder + [OK] seq conversion +SharedPtr: alloc: 0x1025089f0 tp: DataBufferHolder +SharedPtr: alloc: 0x102508a50 tp: DataBufferHolder +SharedPtr: alloc: 0x102508ab0 tp: DataBufferHolder +running +thread1 +thread2 +SharedPtr: alloc: 0x102508b30 tp: DataBufferHolder +SharedPtr: alloc: 0x102508b90 tp: DataBufferHolder +SharedPtr: destroy: 0x1025a0150"0x102508b30" cnt: 0x102508b10 tp: DataBufferHolder +SharedPtr: decr: 0x102508b30 -1 tp: DataBufferHolder +thread1: sending: (val: (buf: ..., size: 11)) +mybytes2: (val: (buf: ..., size: 11)) +thread1: sent, left over: (val: (buf: ..., size: 11)) +thread2: receiving +thread2: received: (val: (buf: ..., size: 11)) +SharedPtr: destroy: 0x102620120"0x102508b90" cnt: nil tp: DataBufferHolder +SharedPtr: destroy: 0x1025a0450"0x102508b90" cnt: 0x102508b70 tp: DataBufferHolder +SharedPtr: decr: 0x102508b90 1 tp: DataBufferHolder +SharedPtr: destroy: 0x1025a0600"0x102508b90" cnt: 0x102508b10 tp: DataBufferHolder +SharedPtr: decr: 0x102508b90 -2 tp: DataBufferHolder +SharedPtr: destroy: 0x1024936f0"0x102508ab0" cnt: 0x102508a90 tp: DataBufferHolder +SharedPtr: decr: 0x102508ab0 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102493cf0"0x102508a50" cnt: 0x102508a30 tp: DataBufferHolder +SharedPtr: decr: 0x102508a50 -1 tp: DataBufferHolder +SharedPtr: destroy: 0x102493960"0x1025089f0" cnt: 0x1025089d0 tp: DataBufferHolder +SharedPtr: decr: 0x1025089f0 -1 tp: DataBufferHolder + [OK] basic thread test +SharedPtr: destroy: 0x102493f00"0x102508b90" cnt: nil tp: DataBufferHolder \ No newline at end of file