mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-04 06:23:06 +00:00
203 lines
6.5 KiB
Nim
203 lines
6.5 KiB
Nim
## This file defines the Codex context and its thread flow:
|
|
## 1. Client enqueues a request and signals the Codex thread.
|
|
## 2. The Codex thread dequeues the request and sends an ack (reqReceivedSignal).
|
|
## 3. The Codex thread executes the request asynchronously.
|
|
## 4. On completion, the Codex thread invokes the client callback with the result and userData.
|
|
|
|
{.pragma: exported, exportc, cdecl, raises: [].}
|
|
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
|
{.passc: "-fPIC".}
|
|
|
|
import std/[options, locks, atomics]
|
|
import chronicles
|
|
import chronos
|
|
import chronos/threadsync
|
|
import taskpools/channels_spsc_single
|
|
import ./ffi_types
|
|
import ./codex_thread_requests/[codex_thread_request]
|
|
|
|
from ../codex/codex import CodexServer
|
|
|
|
type CodexContext* = object
|
|
thread: Thread[(ptr CodexContext)]
|
|
|
|
# This lock is only necessary while we use a SP Channel and while the signalling
|
|
# between threads assumes that there aren't concurrent requests.
|
|
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
|
# requests concurrently and spare us the need of locks
|
|
lock: Lock
|
|
|
|
# Channel to send requests to the Codex thread.
|
|
# Requests will be popped from this channel.
|
|
reqChannel: ChannelSPSCSingle[ptr CodexThreadRequest]
|
|
|
|
# To notify the Codex thread that a request is ready
|
|
reqSignal: ThreadSignalPtr
|
|
|
|
# To notify the client thread that the request was received.
|
|
# It is acknowledgment signal (handshake).
|
|
reqReceivedSignal: ThreadSignalPtr
|
|
|
|
# Custom state attached by the client to a request,
|
|
# returned when its callback is invoked
|
|
userData*: pointer
|
|
|
|
# Function called by the library to notify the client of global events
|
|
eventCallback*: pointer
|
|
|
|
# Custom state attached by the client to the context,
|
|
# returned with every event callback
|
|
eventUserData*: pointer
|
|
|
|
# Set to false to stop the Codex thread (during codex_destroy)
|
|
running: Atomic[bool]
|
|
|
|
template callEventCallback(ctx: ptr CodexContext, eventName: string, body: untyped) =
|
|
## Template used to notify the client of global events
|
|
## Example: onConnectionChanged, onProofMissing, etc.
|
|
if isNil(ctx[].eventCallback):
|
|
error eventName & " - eventCallback is nil"
|
|
return
|
|
|
|
foreignThreadGc:
|
|
try:
|
|
let event = body
|
|
cast[CodexCallback](ctx[].eventCallback)(
|
|
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
|
)
|
|
except CatchableError:
|
|
let msg =
|
|
"Exception " & eventName & " when calling 'eventCallBack': " &
|
|
getCurrentExceptionMsg()
|
|
cast[CodexCallback](ctx[].eventCallback)(
|
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
|
)
|
|
|
|
proc sendRequestToCodexThread*(
|
|
ctx: ptr CodexContext,
|
|
reqType: RequestType,
|
|
reqContent: pointer,
|
|
callback: CodexCallback,
|
|
userData: pointer,
|
|
timeout = InfiniteDuration,
|
|
): Result[void, string] =
|
|
ctx.lock.acquire()
|
|
|
|
defer:
|
|
ctx.lock.release()
|
|
|
|
let req = CodexThreadRequest.createShared(reqType, reqContent, callback, userData)
|
|
|
|
# Send the request to the Codex thread
|
|
let sentOk = ctx.reqChannel.trySend(req)
|
|
if not sentOk:
|
|
deallocShared(req)
|
|
return err("Couldn't send a request to the codex thread: " & $req[])
|
|
|
|
# Notify the Codex thread that a request is available
|
|
let fireSyncRes = ctx.reqSignal.fireSync()
|
|
if fireSyncRes.isErr():
|
|
deallocShared(req)
|
|
return err("failed fireSync: " & $fireSyncRes.error)
|
|
|
|
if fireSyncRes.get() == false:
|
|
deallocShared(req)
|
|
return err("Couldn't fireSync in time")
|
|
|
|
# Wait until the Codex Thread properly received the request
|
|
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
|
if res.isErr():
|
|
deallocShared(req)
|
|
return err("Couldn't receive reqReceivedSignal signal")
|
|
|
|
## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the
|
|
## process proc. See the 'codex_thread_request.nim' module for more details.
|
|
ok()
|
|
|
|
proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} =
|
|
var codex: CodexServer
|
|
|
|
while true:
|
|
try:
|
|
# Wait until a request is available
|
|
await ctx.reqSignal.wait()
|
|
except Exception as e:
|
|
error "codex thread error while waiting for reqSignal", error = e.msg
|
|
continue
|
|
|
|
# If codex_destroy was called, exit the loop
|
|
if ctx.running.load == false:
|
|
break
|
|
|
|
var request: ptr CodexThreadRequest
|
|
|
|
# Pop a request from the channel
|
|
let recvOk = ctx.reqChannel.tryRecv(request)
|
|
if not recvOk:
|
|
error "codex thread could not receive a request"
|
|
continue
|
|
|
|
# Dispatch the request to be processed asynchronously
|
|
asyncSpawn CodexThreadRequest.process(request, addr codex)
|
|
|
|
# Notify the main thread that we picked up the request
|
|
let fireRes = ctx.reqReceivedSignal.fireSync()
|
|
if fireRes.isErr():
|
|
error "could not fireSync back to requester thread", error = fireRes.error
|
|
|
|
proc run(ctx: ptr CodexContext) {.thread.} =
|
|
waitFor runCodex(ctx)
|
|
|
|
proc createCodexContext*(): Result[ptr CodexContext, string] =
|
|
## This proc is called from the main thread and it creates
|
|
## the Codex working thread.
|
|
|
|
# Allocates a CodexContext in shared memory (for the main thread)
|
|
var ctx = createShared(CodexContext, 1)
|
|
|
|
# This signal is used by the main side to wake the Codex thread
|
|
# when a new request is enqueued.
|
|
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
|
return err("couldn't create reqSignal ThreadSignalPtr")
|
|
|
|
# Used to let the caller know that the Codex thread has
|
|
# acknowledged / picked up a request (like a handshake).
|
|
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
|
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
|
|
|
# Protects shared state inside CodexContext
|
|
ctx.lock.initLock()
|
|
|
|
# Codex thread will loop until codex_destroy is called
|
|
ctx.running.store(true)
|
|
|
|
try:
|
|
createThread(ctx.thread, run, ctx)
|
|
except ValueError, ResourceExhaustedError:
|
|
freeShared(ctx)
|
|
return err("failed to create the Codex thread: " & getCurrentExceptionMsg())
|
|
|
|
return ok(ctx)
|
|
|
|
proc destroyCodexContext*(ctx: ptr CodexContext): Result[void, string] =
|
|
# Signal the Codex thread to stop
|
|
ctx.running.store(false)
|
|
|
|
# Wake the worker up if it's waiting
|
|
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
|
return err("error in destroyCodexContext: " & $error)
|
|
|
|
if not signaledOnTime:
|
|
return err("failed to signal reqSignal on time in destroyCodexContext")
|
|
|
|
# Wait for the thread to finish
|
|
joinThread(ctx.thread)
|
|
|
|
# Clean up
|
|
ctx.lock.deinitLock()
|
|
?ctx.reqSignal.close()
|
|
?ctx.reqReceivedSignal.close()
|
|
freeShared(ctx)
|
|
|
|
return ok()
|