mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-05-17 07:39:34 +00:00
fix C++ "async" methods spawn a thread per call that then blocks on a condvar (cpp.nim:362–377). std::async(std::launch::async, ...) forces a fresh thread, and the body runs
the blocking ffi_call_ which waits on a condvar with the user's timeout (default 30s). Under load this is a thread-explosion factory, and the name "async" is misleading — the Rust side has real async via tokio oneshot, but the C++ side has fake async. If true async isn't reachable in C++ without coroutines, fine, but at least pool the threads or document this is just a convenience wrapper.
This commit is contained in:
parent
8479fb8ad3
commit
7764b2f43b
@ -8,6 +8,7 @@
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
#include <optional>
|
||||
#include <nlohmann/json.hpp>
|
||||
@ -148,10 +149,63 @@ inline std::string ffi_call_(std::function<int(FfiCallback, void*)> f,
|
||||
return state->msg;
|
||||
}
|
||||
|
||||
// True-async helpers: std::promise<T> + std::future<T> mirror the Rust
|
||||
// tokio::sync::oneshot design -- the FFI callback completes the promise
|
||||
// directly, so the returned future becomes ready without ever blocking a
|
||||
// thread. The C ABI trampoline cannot be a template, so the per-T completion
|
||||
// logic is type-erased into a std::function held inside FfiAsyncState_.
|
||||
struct FfiAsyncState_ {
|
||||
std::function<void(int, const char*)> complete;
|
||||
};
|
||||
|
||||
inline void ffi_cb_async_(int ret, const char* msg, size_t /*len*/, void* ud) {
|
||||
auto* state = static_cast<FfiAsyncState_*>(ud);
|
||||
state->complete(ret, msg);
|
||||
delete state;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
inline std::future<T> ffi_call_async_(std::function<int(FfiCallback, void*)> f) {
|
||||
auto promise = std::make_shared<std::promise<T>>();
|
||||
auto future = promise->get_future();
|
||||
auto* state = new FfiAsyncState_{
|
||||
[promise](int ret, const char* msg) {
|
||||
const std::string s = msg ? std::string(msg) : std::string{};
|
||||
try {
|
||||
if (ret == 0) {
|
||||
if constexpr (std::is_same_v<T, std::string>) {
|
||||
promise->set_value(s);
|
||||
} else if constexpr (std::is_same_v<T, void*>) {
|
||||
promise->set_value(deserializeFfiResult<void*>(s));
|
||||
} else {
|
||||
promise->set_value(deserializeFfiResult<T>(s));
|
||||
}
|
||||
} else {
|
||||
promise->set_exception(std::make_exception_ptr(std::runtime_error(s)));
|
||||
}
|
||||
} catch (...) {
|
||||
promise->set_exception(std::current_exception());
|
||||
}
|
||||
}
|
||||
};
|
||||
const int ret = f(ffi_cb_async_, state);
|
||||
if (ret == 2) {
|
||||
delete state;
|
||||
throw std::runtime_error("RET_MISSING_CALLBACK (internal error)");
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
// ============================================================
|
||||
// High-level C++ context class
|
||||
//
|
||||
// Async methods (createAsync / <name>Async) return a std::future<T>
|
||||
// that becomes ready when the Nim callback fires. No thread is
|
||||
// spawned for the wait: the FFI callback completes the underlying
|
||||
// std::promise directly, mirroring the Rust tokio::oneshot path.
|
||||
// Apply timeouts via future.wait_for(...) on the caller's side.
|
||||
// ============================================================
|
||||
|
||||
class NimTimerCtx {
|
||||
@ -170,7 +224,30 @@ public:
|
||||
}
|
||||
|
||||
static std::future<NimTimerCtx> createAsync(const TimerConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
|
||||
return std::async(std::launch::async, [config, timeout]() { return create(config, timeout); });
|
||||
const auto config_json = serializeFfiArg(config);
|
||||
auto promise = std::make_shared<std::promise<NimTimerCtx>>();
|
||||
auto future = promise->get_future();
|
||||
auto* state = new FfiAsyncState_{
|
||||
[promise, timeout](int ret, const char* msg) {
|
||||
const std::string s = msg ? std::string(msg) : std::string{};
|
||||
try {
|
||||
if (ret == 0) {
|
||||
const auto addr = std::stoull(s);
|
||||
promise->set_value(NimTimerCtx(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout));
|
||||
} else {
|
||||
promise->set_exception(std::make_exception_ptr(std::runtime_error(s)));
|
||||
}
|
||||
} catch (...) {
|
||||
promise->set_exception(std::current_exception());
|
||||
}
|
||||
}
|
||||
};
|
||||
const int ret = nimtimer_create(config_json.c_str(), ffi_cb_async_, state);
|
||||
if (ret == 2) {
|
||||
delete state;
|
||||
throw std::runtime_error("RET_MISSING_CALLBACK (internal error)");
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
~NimTimerCtx() {
|
||||
@ -205,7 +282,10 @@ public:
|
||||
}
|
||||
|
||||
std::future<EchoResponse> echoAsync(const EchoRequest& req) const {
|
||||
return std::async(std::launch::async, [this, req]() { return echo(req); });
|
||||
const auto req_json = serializeFfiArg(req);
|
||||
return ffi_call_async_<EchoResponse>([&](FfiCallback cb, void* ud) {
|
||||
return nimtimer_echo(ptr_, cb, ud, req_json.c_str());
|
||||
});
|
||||
}
|
||||
|
||||
std::string version() const {
|
||||
@ -216,7 +296,9 @@ public:
|
||||
}
|
||||
|
||||
std::future<std::string> versionAsync() const {
|
||||
return std::async(std::launch::async, [this]() { return version(); });
|
||||
return ffi_call_async_<std::string>([&](FfiCallback cb, void* ud) {
|
||||
return nimtimer_version(ptr_, cb, ud);
|
||||
});
|
||||
}
|
||||
|
||||
ComplexResponse complex(const ComplexRequest& req) const {
|
||||
@ -228,7 +310,10 @@ public:
|
||||
}
|
||||
|
||||
std::future<ComplexResponse> complexAsync(const ComplexRequest& req) const {
|
||||
return std::async(std::launch::async, [this, req]() { return complex(req); });
|
||||
const auto req_json = serializeFfiArg(req);
|
||||
return ffi_call_async_<ComplexResponse>([&](FfiCallback cb, void* ud) {
|
||||
return nimtimer_complex(ptr_, cb, ud, req_json.c_str());
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@ -57,6 +57,7 @@ proc generateCppHeader*(
|
||||
lines.add("#include <memory>")
|
||||
lines.add("#include <functional>")
|
||||
lines.add("#include <future>")
|
||||
lines.add("#include <type_traits>")
|
||||
lines.add("#include <vector>")
|
||||
lines.add("#include <optional>")
|
||||
lines.add("#include <nlohmann/json.hpp>")
|
||||
@ -220,6 +221,60 @@ proc generateCppHeader*(
|
||||
lines.add(" return state->msg;")
|
||||
lines.add("}")
|
||||
lines.add("")
|
||||
|
||||
# ── True-async helpers ────────────────────────────────────────────────────
|
||||
# std::promise<T> + std::future<T> mirror the Rust tokio::sync::oneshot
|
||||
# design: the FFI callback completes the promise directly, so the returned
|
||||
# future becomes ready without ever blocking a thread.
|
||||
#
|
||||
# Type erasure: the C ABI callback (ffi_cb_async_) cannot be a template,
|
||||
# so the per-T completion logic is stored in a std::function held inside
|
||||
# FfiAsyncState_. The trampoline just dispatches to it and frees the state.
|
||||
lines.add("struct FfiAsyncState_ {")
|
||||
lines.add(" std::function<void(int, const char*)> complete;")
|
||||
lines.add("};")
|
||||
lines.add("")
|
||||
lines.add("inline void ffi_cb_async_(int ret, const char* msg, size_t /*len*/, void* ud) {")
|
||||
lines.add(" auto* state = static_cast<FfiAsyncState_*>(ud);")
|
||||
lines.add(" state->complete(ret, msg);")
|
||||
lines.add(" delete state;")
|
||||
lines.add("}")
|
||||
lines.add("")
|
||||
# Generic per-T helper. JSON deserialization happens in the callback (the
|
||||
# same Nim/chronos thread that already runs the blocking path's notify);
|
||||
# this keeps the returned future correctly waitable via wait_for/wait_until.
|
||||
lines.add("template<typename T>")
|
||||
lines.add("inline std::future<T> ffi_call_async_(std::function<int(FfiCallback, void*)> f) {")
|
||||
lines.add(" auto promise = std::make_shared<std::promise<T>>();")
|
||||
lines.add(" auto future = promise->get_future();")
|
||||
lines.add(" auto* state = new FfiAsyncState_{")
|
||||
lines.add(" [promise](int ret, const char* msg) {")
|
||||
lines.add(" const std::string s = msg ? std::string(msg) : std::string{};")
|
||||
lines.add(" try {")
|
||||
lines.add(" if (ret == 0) {")
|
||||
lines.add(" if constexpr (std::is_same_v<T, std::string>) {")
|
||||
lines.add(" promise->set_value(s);")
|
||||
lines.add(" } else if constexpr (std::is_same_v<T, void*>) {")
|
||||
lines.add(" promise->set_value(deserializeFfiResult<void*>(s));")
|
||||
lines.add(" } else {")
|
||||
lines.add(" promise->set_value(deserializeFfiResult<T>(s));")
|
||||
lines.add(" }")
|
||||
lines.add(" } else {")
|
||||
lines.add(" promise->set_exception(std::make_exception_ptr(std::runtime_error(s)));")
|
||||
lines.add(" }")
|
||||
lines.add(" } catch (...) {")
|
||||
lines.add(" promise->set_exception(std::current_exception());")
|
||||
lines.add(" }")
|
||||
lines.add(" }")
|
||||
lines.add(" };")
|
||||
lines.add(" const int ret = f(ffi_cb_async_, state);")
|
||||
lines.add(" if (ret == 2) {")
|
||||
lines.add(" delete state;")
|
||||
lines.add(" throw std::runtime_error(\"RET_MISSING_CALLBACK (internal error)\");")
|
||||
lines.add(" }")
|
||||
lines.add(" return future;")
|
||||
lines.add("}")
|
||||
lines.add("")
|
||||
lines.add("} // anonymous namespace")
|
||||
lines.add("")
|
||||
|
||||
@ -238,6 +293,12 @@ proc generateCppHeader*(
|
||||
|
||||
lines.add("// ============================================================")
|
||||
lines.add("// High-level C++ context class")
|
||||
lines.add("//")
|
||||
lines.add("// Async methods (createAsync / <name>Async) return a std::future<T>")
|
||||
lines.add("// that becomes ready when the Nim callback fires. No thread is")
|
||||
lines.add("// spawned for the wait: the FFI callback completes the underlying")
|
||||
lines.add("// std::promise directly, mirroring the Rust tokio::oneshot path.")
|
||||
lines.add("// Apply timeouts via future.wait_for(...) on the caller's side.")
|
||||
lines.add("// ============================================================")
|
||||
lines.add("")
|
||||
lines.add("class $1 {" % [ctxTypeName])
|
||||
@ -281,21 +342,45 @@ proc generateCppHeader*(
|
||||
lines.add(" }")
|
||||
lines.add("")
|
||||
|
||||
# -- createAsync() factory: uses actual param types, not hardcoded --
|
||||
let captureList =
|
||||
if epNames.len > 0: epNames.join(", ") & ", timeout"
|
||||
else: "timeout"
|
||||
let callList =
|
||||
if epNames.len > 0: epNames.join(", ") & ", timeout"
|
||||
else: "timeout"
|
||||
# -- createAsync() factory: true async via std::promise; no thread is
|
||||
# spawned, the FFI callback constructs the Ctx and completes the promise.
|
||||
lines.add(
|
||||
" static std::future<$1> createAsync($2) {" %
|
||||
[ctxTypeName, ctorParamsWithTimeout]
|
||||
)
|
||||
for ep in ctor.extraParams:
|
||||
lines.add(" const auto $1_json = serializeFfiArg($1);" % [ep.name])
|
||||
lines.add(" auto promise = std::make_shared<std::promise<$1>>();" % [ctxTypeName])
|
||||
lines.add(" auto future = promise->get_future();")
|
||||
lines.add(" auto* state = new FfiAsyncState_{")
|
||||
lines.add(" [promise, timeout](int ret, const char* msg) {")
|
||||
lines.add(" const std::string s = msg ? std::string(msg) : std::string{};")
|
||||
lines.add(" try {")
|
||||
lines.add(" if (ret == 0) {")
|
||||
lines.add(" const auto addr = std::stoull(s);")
|
||||
lines.add(
|
||||
" return std::async(std::launch::async, [$1]() { return create($2); });" %
|
||||
[captureList, callList]
|
||||
" promise->set_value($1(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout));" %
|
||||
[ctxTypeName]
|
||||
)
|
||||
lines.add(" } else {")
|
||||
lines.add(" promise->set_exception(std::make_exception_ptr(std::runtime_error(s)));")
|
||||
lines.add(" }")
|
||||
lines.add(" } catch (...) {")
|
||||
lines.add(" promise->set_exception(std::current_exception());")
|
||||
lines.add(" }")
|
||||
lines.add(" }")
|
||||
lines.add(" };")
|
||||
var ctorCallArgs: seq[string] = @[]
|
||||
for ep in ctor.extraParams:
|
||||
ctorCallArgs.add("$1_json.c_str()" % [ep.name])
|
||||
ctorCallArgs.add("ffi_cb_async_")
|
||||
ctorCallArgs.add("state")
|
||||
lines.add(" const int ret = $1($2);" % [ctor.procName, ctorCallArgs.join(", ")])
|
||||
lines.add(" if (ret == 2) {")
|
||||
lines.add(" delete state;")
|
||||
lines.add(" throw std::runtime_error(\"RET_MISSING_CALLBACK (internal error)\");")
|
||||
lines.add(" }")
|
||||
lines.add(" return future;")
|
||||
lines.add(" }")
|
||||
lines.add("")
|
||||
|
||||
@ -334,12 +419,9 @@ proc generateCppHeader*(
|
||||
let retCppType = nimTypeToCpp(m.returnTypeName)
|
||||
|
||||
var methParams: seq[string] = @[]
|
||||
var methParamNames: seq[string] = @[]
|
||||
for ep in m.extraParams:
|
||||
methParams.add("const $1& $2" % [nimTypeToCpp(ep.typeName), ep.name])
|
||||
methParamNames.add(ep.name)
|
||||
let methParamsStr = methParams.join(", ")
|
||||
let methParamNamesStr = methParamNames.join(", ")
|
||||
|
||||
lines.add(" $1 $2($3) const {" % [retCppType, methodName, methParamsStr])
|
||||
for ep in m.extraParams:
|
||||
@ -356,23 +438,21 @@ proc generateCppHeader*(
|
||||
lines.add(" return deserializeFfiResult<$1>(raw);" % [retCppType])
|
||||
lines.add(" }")
|
||||
lines.add("")
|
||||
if methParamsStr.len > 0:
|
||||
lines.add(
|
||||
" std::future<$1> $2Async($3) const {" %
|
||||
[retCppType, methodName, methParamsStr]
|
||||
)
|
||||
lines.add(
|
||||
" return std::async(std::launch::async, [this, $1]() { return $2($3); });" %
|
||||
[methParamNamesStr, methodName, methParamNamesStr]
|
||||
)
|
||||
lines.add(" }")
|
||||
else:
|
||||
lines.add(" std::future<$1> $2Async() const {" % [retCppType, methodName])
|
||||
lines.add(
|
||||
" return std::async(std::launch::async, [this]() { return $1(); });" %
|
||||
[methodName]
|
||||
)
|
||||
lines.add(" }")
|
||||
# -- <method>Async: true async via std::promise; the FFI callback
|
||||
# completes the promise on the Nim thread. No std::thread is spawned.
|
||||
lines.add(
|
||||
" std::future<$1> $2Async($3) const {" %
|
||||
[retCppType, methodName, methParamsStr]
|
||||
)
|
||||
for ep in m.extraParams:
|
||||
lines.add(" const auto $1_json = serializeFfiArg($1);" % [ep.name])
|
||||
lines.add(" return ffi_call_async_<$1>([&](FfiCallback cb, void* ud) {" % [retCppType])
|
||||
var asyncCallArgs = @["ptr_", "cb", "ud"]
|
||||
for ep in m.extraParams:
|
||||
asyncCallArgs.add("$1_json.c_str()" % [ep.name])
|
||||
lines.add(" return $1($2);" % [m.procName, asyncCallArgs.join(", ")])
|
||||
lines.add(" });")
|
||||
lines.add(" }")
|
||||
lines.add("")
|
||||
|
||||
lines.add("private:")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user