Merge branch 'master' into chore/ffi-context-lifecycle

This commit is contained in:
Gabriel Cruz 2026-06-03 15:37:23 -03:00 committed by GitHub
commit 9971902fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1019 additions and 805 deletions

View File

@ -1,10 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(echo_cpp_bindings CXX C)
# The generated bindings target C++20. The event-listener API uses
# std::span<const std::uint8_t> on its wildcard callback to hand the
# CBOR envelope to consumers as a zero-copy view; <span> only became
# part of the standard library in C++20.
# The generated bindings target C++20: designated initializers and other
# C++20 constructs are used throughout the emitted code.
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -1,6 +1,6 @@
#pragma once
// Generated bindings require C++20 — the event-listener API uses
// std::span<const std::uint8_t> for the wildcard callback.
// Generated bindings require C++20 (designated initializers and other
// C++20 constructs are used throughout the emitted code).
// MSVC keeps __cplusplus at 199711L unless /Zc:__cplusplus is passed,
// so consult _MSVC_LANG when present (it always reflects the active
// /std:c++XX level).
@ -14,7 +14,7 @@
#include <string>
#include <cstdint>
#include <chrono>
#include <stdexcept>
#include <charconv>
#include <mutex>
#include <condition_variable>
#include <memory>
@ -24,10 +24,73 @@
#include <optional>
#include <type_traits>
#include <cstring>
#include <cassert>
extern "C" {
#include <tinycbor/cbor.h>
}
// ============================================================
// Result<T> — exception-free error channel
// ============================================================
// The generated bindings never throw: every fallible entry point (create,
// instance methods, and their *Async futures) returns a Result<T>. Callers
// branch on isOk()/isErr() (or the explicit bool conversion) and read
// value()/error(). This mirrors the Nim side's Result[T, string] and keeps
// us off C++23's std::expected.
#ifndef NIM_FFI_RESULT_HPP_INCLUDED
#define NIM_FFI_RESULT_HPP_INCLUDED
template <typename T>
class Result {
std::optional<T> value_;
std::string error_;
public:
static Result<T> ok(T value) {
Result<T> r;
r.value_ = std::move(value);
return r;
}
static Result<T> err(std::string message) {
Result<T> r;
r.error_ = std::move(message);
return r;
}
bool isOk() const { return value_.has_value(); }
bool isErr() const { return !value_.has_value(); }
explicit operator bool() const { return isOk(); }
const T& value() const { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
T& value() { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
const T& operator*() const { assert(value_.has_value() && "Result::operator*() called on err Result — check isOk() first"); return *value_; }
const T* operator->() const { assert(value_.has_value() && "Result::operator->() called on err Result — check isOk() first"); return &*value_; }
T&& take() { assert(value_.has_value() && "Result::take() called on err Result — check isOk() first"); return std::move(*value_); }
const std::string& error() const { assert(!value_.has_value() && "Result::error() called on ok Result — check isErr() first"); return error_; }
};
template <>
class Result<void> {
bool ok_ = true;
std::string error_;
public:
static Result<void> ok() {
Result<void> r;
r.ok_ = true;
return r;
}
static Result<void> err(std::string message) {
Result<void> r;
r.ok_ = false;
r.error_ = std::move(message);
return r;
}
Result() = default;
bool isOk() const { return ok_; }
bool isErr() const { return !ok_; }
explicit operator bool() const { return isOk(); }
const std::string& error() const { assert(!ok_ && "Result<void>::error() called on ok Result — check isErr() first"); return error_; }
};
#endif // NIM_FFI_RESULT_HPP_INCLUDED
// ── encode_cbor overloads (primitives + containers) ─────────────────────
// Per-struct encode_cbor / decode_cbor are emitted by cpp.nim next to each
// generated struct; these helpers cover the leaf types they defer into.
@ -159,7 +222,7 @@ inline CborError decode_cbor(CborValue& it, std::optional<T>& out) {
// ── Public entry points ─────────────────────────────────────────────────
template<typename T>
inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
inline Result<std::vector<std::uint8_t>> encodeCborFFI(const T& value) {
// Start with a generous 4 KiB buffer; double on overflow until it fits.
std::vector<std::uint8_t> buf(4096);
while (true) {
@ -169,34 +232,34 @@ inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
if (err == CborNoError) {
const size_t used = cbor_encoder_get_buffer_size(&enc, buf.data());
buf.resize(used);
return buf;
return Result<std::vector<std::uint8_t>>::ok(std::move(buf));
}
if (err == CborErrorOutOfMemory) {
const size_t extra = cbor_encoder_get_extra_bytes_needed(&enc);
buf.resize(buf.size() + (extra > 0 ? extra : buf.size()));
continue;
}
throw std::runtime_error(std::string("FFI CBOR encode failed: ") +
cbor_error_string(err));
return Result<std::vector<std::uint8_t>>::err(
std::string("FFI CBOR encode failed: ") + cbor_error_string(err));
}
}
template<typename T>
inline T decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
inline Result<T> decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
CborParser parser;
CborValue it;
CborError err = cbor_parser_init(bytes.data(), bytes.size(), 0, &parser, &it);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
}
T out{};
err = decode_cbor(it, out);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
}
return out;
return Result<T>::ok(std::move(out));
}
#endif // NIM_FFI_CBOR_HELPERS_HPP_INCLUDED
@ -384,22 +447,25 @@ inline void ffi_cb_(int ret, const char* msg, size_t len, void* ud) {
s.cv.notify_one();
}
inline std::vector<std::uint8_t> ffi_call_(std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
inline Result<std::vector<std::uint8_t>> ffi_call_(
std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
using Bytes = std::vector<std::uint8_t>;
auto state = std::make_shared<FFICallState_>();
auto* cb_ref = new std::shared_ptr<FFICallState_>(state);
const int ret = f(ffi_cb_, cb_ref);
if (ret == 2) {
delete cb_ref;
throw std::runtime_error("RET_MISSING_CALLBACK (internal error)");
return Result<Bytes>::err("RET_MISSING_CALLBACK (internal error)");
}
std::unique_lock<std::mutex> lock(state->mtx);
const bool fired = state->cv.wait_for(lock, timeout, [&]{ return state->done; });
if (!fired)
throw std::runtime_error("FFI call timed out after " + std::to_string(timeout.count()) + "ms");
return Result<Bytes>::err("FFI call timed out after " +
std::to_string(timeout.count()) + "ms");
if (!state->ok)
throw std::runtime_error(state->err);
return state->bytes;
return Result<Bytes>::err(state->err);
return Result<Bytes>::ok(std::move(state->bytes));
}
} // anonymous namespace
@ -412,23 +478,30 @@ inline std::vector<std::uint8_t> ffi_call_(std::function<int(FFICallback, void*)
class EchoCtx {
public:
static std::unique_ptr<EchoCtx> create(const EchoConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
static Result<std::unique_ptr<EchoCtx>> create(const EchoConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
const auto ffi_req_ = EchoCreateCtorReq{config};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<std::unique_ptr<EchoCtx>>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
(void)echo_create(ffi_req_bytes_.data(), ffi_req_bytes_.size(), cb, ud);
return 0;
}, timeout);
const auto addr_str = decodeCborFFI<std::string>(ffi_raw_);
try {
const auto addr = std::stoull(addr_str);
return std::unique_ptr<EchoCtx>(new EchoCtx(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout));
} catch (const std::exception&) {
throw std::runtime_error("FFI create returned non-numeric address: " + addr_str);
if (ffi_raw_.isErr()) return Result<std::unique_ptr<EchoCtx>>::err(ffi_raw_.error());
auto ffi_addr_ = decodeCborFFI<std::string>(ffi_raw_.value());
if (ffi_addr_.isErr()) return Result<std::unique_ptr<EchoCtx>>::err(ffi_addr_.error());
const auto& addr_str = ffi_addr_.value();
std::uint64_t addr = 0;
const char* addr_begin = addr_str.data();
const char* addr_end = addr_begin + addr_str.size();
const auto fc_ = std::from_chars(addr_begin, addr_end, addr);
if (fc_.ec != std::errc() || fc_.ptr != addr_end) {
return Result<std::unique_ptr<EchoCtx>>::err("FFI create returned non-numeric address: " + addr_str);
}
return Result<std::unique_ptr<EchoCtx>>::ok(std::unique_ptr<EchoCtx>(new EchoCtx(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout)));
}
static std::future<std::unique_ptr<EchoCtx>> createAsync(const EchoConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
static std::future<Result<std::unique_ptr<EchoCtx>>> createAsync(const EchoConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
return std::async(std::launch::async, [config, timeout]() { return create(config, timeout); });
}
@ -453,29 +526,35 @@ public:
EchoCtx(EchoCtx&&) = delete;
EchoCtx& operator=(EchoCtx&&) = delete;
ShoutResponse shout(const ShoutRequest& req) const {
Result<ShoutResponse> shout(const ShoutRequest& req) const {
const auto ffi_req_ = EchoShoutReq{req};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<ShoutResponse>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return echo_shout(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<ShoutResponse>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<ShoutResponse>::err(ffi_raw_.error());
return decodeCborFFI<ShoutResponse>(ffi_raw_.value());
}
std::future<ShoutResponse> shoutAsync(const ShoutRequest& req) const {
std::future<Result<ShoutResponse>> shoutAsync(const ShoutRequest& req) const {
return std::async(std::launch::async, [this, req]() { return this->shout(req); });
}
std::string version() const {
Result<std::string> version() const {
const auto ffi_req_ = EchoVersionReq{};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<std::string>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return echo_version(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<std::string>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<std::string>::err(ffi_raw_.error());
return decodeCborFFI<std::string>(ffi_raw_.value());
}
std::future<std::string> versionAsync() const {
std::future<Result<std::string>> versionAsync() const {
return std::async(std::launch::async, [this]() { return this->version(); });
}

View File

@ -1,10 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project(my_timer_cpp_bindings CXX C)
# The generated bindings target C++20. The event-listener API uses
# std::span<const std::uint8_t> on its wildcard callback to hand the
# CBOR envelope to consumers as a zero-copy view; <span> only became
# part of the standard library in C++20.
# The generated bindings target C++20: designated initializers and other
# C++20 constructs are used throughout the emitted code.
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -5,121 +5,113 @@
#include <iostream>
#include <thread>
// The generated bindings never throw: every call returns a Result<T>. We
// branch on isErr() and read value()/error() instead of using try/catch.
int main() {
try {
auto ctx = MyTimerCtx::create(TimerConfig{"cpp-demo"});
std::cout << "[1] Context created\n";
auto versionFuture = ctx->versionAsync();
auto echo1Future = ctx->echoAsync(EchoRequest{"hello from C++", 200});
auto echo2Future = ctx->echoAsync(EchoRequest{"second C++ request", 50});
auto version = versionFuture.get();
std::cout << "[2] Version: " << version << "\n";
auto echo = echo1Future.get();
std::cout << "[3] Echo 1: echoed=" << echo.echoed
<< ", timerName=" << echo.timerName << "\n";
auto echo2 = echo2Future.get();
std::cout << "[4] Echo 2: echoed=" << echo2.echoed
<< ", timerName=" << echo2.timerName << "\n";
auto complexReq = ComplexRequest{
std::vector<EchoRequest>{EchoRequest{"one", 10}, EchoRequest{"two", 20}},
std::vector<std::string>{"fast", "async"},
std::optional<std::string>("extra note"),
std::optional<int64_t>(3)
};
auto complexFuture = ctx->complexAsync(complexReq);
auto complex = complexFuture.get();
std::cout << "[5] Complex: summary=" << complex.summary
<< ", itemCount=" << complex.itemCount
<< ", hasNote=" << complex.hasNote << "\n";
// Each parameter is its own generated C++ struct. The nim-ffi
// macro packs all three into one CBOR envelope on the wire — at
// the call site, this is just a typed method invocation.
auto job = JobSpec{
/*name*/ "nightly-rollup",
/*payload*/ std::vector<std::string>{"rollup", "v2"},
/*priority*/ 10,
};
auto retry = RetryPolicy{
/*maxAttempts*/ 3,
/*backoffMs*/ 500,
/*retryOn*/ std::vector<std::string>{"timeout", "5xx"},
};
auto schedule = ScheduleConfig{
/*startAtMs*/ 1000,
/*intervalMs*/ 15000,
/*jitter*/ std::optional<int64_t>(250),
};
auto scheduleFuture = ctx->scheduleAsync(job, retry, schedule);
auto scheduleRes = scheduleFuture.get();
std::cout << "[6] Schedule (3 complex params): jobId=" << scheduleRes.jobId
<< ", willRunCount=" << scheduleRes.willRunCount
<< ", firstRunAtMs=" << scheduleRes.firstRunAtMs
<< ", effectiveBackoffMs=" << scheduleRes.effectiveBackoffMs
<< "\n";
// Each `{.ffiEvent.}` declared on the Nim side gets a typed
// registration method — `addOnEchoFiredListener(handler)` here.
// A second `addEventListener` overload registers a catch-all
// wildcard listener that receives every event as raw envelope
// bytes plus the FFI return code. Both fire from the lib's
// dispatch thread, so synchronise via std::promise / atomics.
std::promise<EchoEvent> echoEvtPromise;
auto echoEvtFuture = echoEvtPromise.get_future();
const auto typedHandle = ctx->addOnEchoFiredListener(
[&](const EchoEvent& evt) { echoEvtPromise.set_value(evt); });
std::atomic<int> wildcardHits{0};
// Wildcard listener receives every event with the wire `eventId`
// pre-extracted plus a span view over the raw CBOR envelope
// bytes (zero-copy; valid only for the duration of this call).
// Dispatch on `eventId` and use `decodeEventPayload<T>` to lift
// the payload into a typed value without hand-parsing CBOR.
const auto wildcardHandle = ctx->addEventListener(
[&](int retCode, const std::string& eventId,
std::span<const std::uint8_t> envelope) {
wildcardHits.fetch_add(1);
std::cout << "[7] wildcard event: retCode=" << retCode
<< ", eventId=" << eventId
<< ", envelope bytes=" << envelope.size() << "\n";
if (retCode != 0) return;
if (eventId == "on_echo_fired") {
EchoEvent decoded{};
if (decodeEventPayload(envelope, decoded)) {
std::cout << " decoded EchoEvent: message="
<< decoded.message
<< ", echoCount=" << decoded.echoCount << "\n";
}
}
});
ctx->echo(EchoRequest{"event-demo", 1});
const auto evt = echoEvtFuture.get();
std::cout << "[7] typed event onEchoFired: message=" << evt.message
<< ", echoCount=" << evt.echoCount
<< ", wildcardHits=" << wildcardHits.load() << "\n";
// Drop the typed listener — only the wildcard fires for the
// follow-up echo. Sleep briefly to give the lib thread time to
// deliver before we tear the ctx down.
ctx->removeEventListener(typedHandle);
ctx->echo(EchoRequest{"event-demo-after-remove", 1});
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << "[7] after removeEventListener: wildcardHits="
<< wildcardHits.load() << "\n";
ctx->removeEventListener(wildcardHandle);
std::cout << "\nDone.\n";
} catch (const std::exception& ex) {
std::cerr << "Error: " << ex.what() << "\n";
auto ctxRes = MyTimerCtx::create(TimerConfig{"cpp-demo"});
if (ctxRes.isErr()) {
std::cerr << "Error: " << ctxRes.error() << "\n";
return 1;
}
auto ctx = std::move(ctxRes.value());
std::cout << "[1] Context created\n";
auto versionFuture = ctx->versionAsync();
auto echo1Future = ctx->echoAsync(EchoRequest{"hello from C++", 200});
auto echo2Future = ctx->echoAsync(EchoRequest{"second C++ request", 50});
auto version = versionFuture.get();
if (version.isErr()) {
std::cerr << "Error: " << version.error() << "\n";
return 1;
}
std::cout << "[2] Version: " << version.value() << "\n";
auto echo = echo1Future.get();
if (echo.isErr()) {
std::cerr << "Error: " << echo.error() << "\n";
return 1;
}
std::cout << "[3] Echo 1: echoed=" << echo->echoed
<< ", timerName=" << echo->timerName << "\n";
auto echo2 = echo2Future.get();
if (echo2.isErr()) {
std::cerr << "Error: " << echo2.error() << "\n";
return 1;
}
std::cout << "[4] Echo 2: echoed=" << echo2->echoed
<< ", timerName=" << echo2->timerName << "\n";
auto complexReq = ComplexRequest{
std::vector<EchoRequest>{EchoRequest{"one", 10}, EchoRequest{"two", 20}},
std::vector<std::string>{"fast", "async"},
std::optional<std::string>("extra note"),
std::optional<int64_t>(3)
};
auto complex = ctx->complexAsync(complexReq).get();
if (complex.isErr()) {
std::cerr << "Error: " << complex.error() << "\n";
return 1;
}
std::cout << "[5] Complex: summary=" << complex->summary
<< ", itemCount=" << complex->itemCount
<< ", hasNote=" << complex->hasNote << "\n";
// ── 6. Call with three complex parameters ─────────────────────
// Each parameter is its own generated C++ struct. The nim-ffi
// macro packs all three into one CBOR envelope on the wire — at
// the call site, this is just a typed method invocation.
auto job = JobSpec{
/*name*/ "nightly-rollup",
/*payload*/ std::vector<std::string>{"rollup", "v2"},
/*priority*/ 10,
};
auto retry = RetryPolicy{
/*maxAttempts*/ 3,
/*backoffMs*/ 500,
/*retryOn*/ std::vector<std::string>{"timeout", "5xx"},
};
auto schedule = ScheduleConfig{
/*startAtMs*/ 1000,
/*intervalMs*/ 15000,
/*jitter*/ std::optional<int64_t>(250),
};
auto scheduleRes = ctx->scheduleAsync(job, retry, schedule).get();
if (scheduleRes.isErr()) {
std::cerr << "Error: " << scheduleRes.error() << "\n";
return 1;
}
std::cout << "[6] Schedule (3 complex params): jobId=" << scheduleRes->jobId
<< ", willRunCount=" << scheduleRes->willRunCount
<< ", firstRunAtMs=" << scheduleRes->firstRunAtMs
<< ", effectiveBackoffMs=" << scheduleRes->effectiveBackoffMs
<< "\n";
// Each `{.ffiEvent.}` declared on the Nim side gets a typed
// registration method — `addOnEchoFiredListener(handler)` here.
// Subscribe to each event separately; handlers fire from the lib's
// dispatch thread, so synchronise via std::promise / atomics.
std::promise<EchoEvent> echoEvtPromise;
auto echoEvtFuture = echoEvtPromise.get_future();
const auto typedHandle = ctx->addOnEchoFiredListener(
[&](const EchoEvent& evt) { echoEvtPromise.set_value(evt); });
ctx->echo(EchoRequest{"event-demo", 1});
const auto evt = echoEvtFuture.get();
std::cout << "[7] typed event onEchoFired: message=" << evt.message
<< ", echoCount=" << evt.echoCount << "\n";
// Drop the typed listener — no handler fires for the follow-up echo.
// Sleep briefly to give the lib thread time to settle before we tear
// the ctx down.
ctx->removeEventListener(typedHandle);
ctx->echo(EchoRequest{"event-demo-after-remove", 1});
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << "[7] after removeEventListener: typed listener removed\n";
std::cout << "\nDone.\n";
return 0;
}

View File

@ -1,6 +1,6 @@
#pragma once
// Generated bindings require C++20 — the event-listener API uses
// std::span<const std::uint8_t> for the wildcard callback.
// Generated bindings require C++20 (designated initializers and other
// C++20 constructs are used throughout the emitted code).
// MSVC keeps __cplusplus at 199711L unless /Zc:__cplusplus is passed,
// so consult _MSVC_LANG when present (it always reflects the active
// /std:c++XX level).
@ -14,7 +14,7 @@
#include <string>
#include <cstdint>
#include <chrono>
#include <stdexcept>
#include <charconv>
#include <mutex>
#include <condition_variable>
#include <memory>
@ -24,12 +24,74 @@
#include <optional>
#include <type_traits>
#include <cstring>
#include <cassert>
extern "C" {
#include <tinycbor/cbor.h>
}
#include <unordered_map>
#include <span>
// ============================================================
// Result<T> — exception-free error channel
// ============================================================
// The generated bindings never throw: every fallible entry point (create,
// instance methods, and their *Async futures) returns a Result<T>. Callers
// branch on isOk()/isErr() (or the explicit bool conversion) and read
// value()/error(). This mirrors the Nim side's Result[T, string] and keeps
// us off C++23's std::expected.
#ifndef NIM_FFI_RESULT_HPP_INCLUDED
#define NIM_FFI_RESULT_HPP_INCLUDED
template <typename T>
class Result {
std::optional<T> value_;
std::string error_;
public:
static Result<T> ok(T value) {
Result<T> r;
r.value_ = std::move(value);
return r;
}
static Result<T> err(std::string message) {
Result<T> r;
r.error_ = std::move(message);
return r;
}
bool isOk() const { return value_.has_value(); }
bool isErr() const { return !value_.has_value(); }
explicit operator bool() const { return isOk(); }
const T& value() const { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
T& value() { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
const T& operator*() const { assert(value_.has_value() && "Result::operator*() called on err Result — check isOk() first"); return *value_; }
const T* operator->() const { assert(value_.has_value() && "Result::operator->() called on err Result — check isOk() first"); return &*value_; }
T&& take() { assert(value_.has_value() && "Result::take() called on err Result — check isOk() first"); return std::move(*value_); }
const std::string& error() const { assert(!value_.has_value() && "Result::error() called on ok Result — check isErr() first"); return error_; }
};
template <>
class Result<void> {
bool ok_ = true;
std::string error_;
public:
static Result<void> ok() {
Result<void> r;
r.ok_ = true;
return r;
}
static Result<void> err(std::string message) {
Result<void> r;
r.ok_ = false;
r.error_ = std::move(message);
return r;
}
Result() = default;
bool isOk() const { return ok_; }
bool isErr() const { return !ok_; }
explicit operator bool() const { return isOk(); }
const std::string& error() const { assert(!ok_ && "Result<void>::error() called on ok Result — check isErr() first"); return error_; }
};
#endif // NIM_FFI_RESULT_HPP_INCLUDED
// ── encode_cbor overloads (primitives + containers) ─────────────────────
// Per-struct encode_cbor / decode_cbor are emitted by cpp.nim next to each
// generated struct; these helpers cover the leaf types they defer into.
@ -161,7 +223,7 @@ inline CborError decode_cbor(CborValue& it, std::optional<T>& out) {
// ── Public entry points ─────────────────────────────────────────────────
template<typename T>
inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
inline Result<std::vector<std::uint8_t>> encodeCborFFI(const T& value) {
// Start with a generous 4 KiB buffer; double on overflow until it fits.
std::vector<std::uint8_t> buf(4096);
while (true) {
@ -171,34 +233,34 @@ inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
if (err == CborNoError) {
const size_t used = cbor_encoder_get_buffer_size(&enc, buf.data());
buf.resize(used);
return buf;
return Result<std::vector<std::uint8_t>>::ok(std::move(buf));
}
if (err == CborErrorOutOfMemory) {
const size_t extra = cbor_encoder_get_extra_bytes_needed(&enc);
buf.resize(buf.size() + (extra > 0 ? extra : buf.size()));
continue;
}
throw std::runtime_error(std::string("FFI CBOR encode failed: ") +
cbor_error_string(err));
return Result<std::vector<std::uint8_t>>::err(
std::string("FFI CBOR encode failed: ") + cbor_error_string(err));
}
}
template<typename T>
inline T decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
inline Result<T> decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
CborParser parser;
CborValue it;
CborError err = cbor_parser_init(bytes.data(), bytes.size(), 0, &parser, &it);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
}
T out{};
err = decode_cbor(it, out);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
}
return out;
return Result<T>::ok(std::move(out));
}
#endif // NIM_FFI_CBOR_HELPERS_HPP_INCLUDED
@ -685,64 +747,61 @@ inline void ffi_cb_(int ret, const char* msg, size_t len, void* ud) {
s.cv.notify_one();
}
inline std::vector<std::uint8_t> ffi_call_(std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
inline Result<std::vector<std::uint8_t>> ffi_call_(
std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
using Bytes = std::vector<std::uint8_t>;
auto state = std::make_shared<FFICallState_>();
auto* cb_ref = new std::shared_ptr<FFICallState_>(state);
const int ret = f(ffi_cb_, cb_ref);
if (ret == 2) {
delete cb_ref;
throw std::runtime_error("RET_MISSING_CALLBACK (internal error)");
return Result<Bytes>::err("RET_MISSING_CALLBACK (internal error)");
}
std::unique_lock<std::mutex> lock(state->mtx);
const bool fired = state->cv.wait_for(lock, timeout, [&]{ return state->done; });
if (!fired)
throw std::runtime_error("FFI call timed out after " + std::to_string(timeout.count()) + "ms");
return Result<Bytes>::err("FFI call timed out after " +
std::to_string(timeout.count()) + "ms");
if (!state->ok)
throw std::runtime_error(state->err);
return state->bytes;
return Result<Bytes>::err(state->err);
return Result<Bytes>::ok(std::move(state->bytes));
}
} // anonymous namespace
#endif // NIM_FFI_SYNC_CALL_HELPER_HPP_INCLUDED
template <class T>
inline bool decodeEventPayload(std::span<const std::uint8_t> envelope, T& out) {
if (envelope.empty()) return false;
CborParser parser; CborValue it;
if (cbor_parser_init(envelope.data(), envelope.size(), 0, &parser, &it) != CborNoError)
return false;
if (!cbor_value_is_map(&it)) return false;
CborValue payloadField;
if (cbor_value_map_find_value(&it, "payload", &payloadField) != CborNoError)
return false;
return decode_cbor(payloadField, out) == CborNoError;
}
// ============================================================
// High-level C++ context class
// ============================================================
class MyTimerCtx {
public:
static std::unique_ptr<MyTimerCtx> create(const TimerConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
static Result<std::unique_ptr<MyTimerCtx>> create(const TimerConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
const auto ffi_req_ = MyTimerCreateCtorReq{config};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<std::unique_ptr<MyTimerCtx>>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
(void)my_timer_create(ffi_req_bytes_.data(), ffi_req_bytes_.size(), cb, ud);
return 0;
}, timeout);
const auto addr_str = decodeCborFFI<std::string>(ffi_raw_);
try {
const auto addr = std::stoull(addr_str);
return std::unique_ptr<MyTimerCtx>(new MyTimerCtx(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout));
} catch (const std::exception&) {
throw std::runtime_error("FFI create returned non-numeric address: " + addr_str);
if (ffi_raw_.isErr()) return Result<std::unique_ptr<MyTimerCtx>>::err(ffi_raw_.error());
auto ffi_addr_ = decodeCborFFI<std::string>(ffi_raw_.value());
if (ffi_addr_.isErr()) return Result<std::unique_ptr<MyTimerCtx>>::err(ffi_addr_.error());
const auto& addr_str = ffi_addr_.value();
std::uint64_t addr = 0;
const char* addr_begin = addr_str.data();
const char* addr_end = addr_begin + addr_str.size();
const auto fc_ = std::from_chars(addr_begin, addr_end, addr);
if (fc_.ec != std::errc() || fc_.ptr != addr_end) {
return Result<std::unique_ptr<MyTimerCtx>>::err("FFI create returned non-numeric address: " + addr_str);
}
return Result<std::unique_ptr<MyTimerCtx>>::ok(std::unique_ptr<MyTimerCtx>(new MyTimerCtx(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout)));
}
static std::future<std::unique_ptr<MyTimerCtx>> createAsync(const TimerConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
static std::future<Result<std::unique_ptr<MyTimerCtx>>> createAsync(const TimerConfig& config, std::chrono::milliseconds timeout = std::chrono::seconds{30}) {
return std::async(std::launch::async, [config, timeout]() { return create(config, timeout); });
}
@ -780,16 +839,6 @@ public:
return ListenerHandle{id};
}
ListenerHandle addEventListener(std::function<void(int, const std::string&, std::span<const std::uint8_t>)> handler) {
auto owned = std::make_unique<WildcardListener>(std::move(handler));
auto* raw = owned.get();
const auto id = my_timer_add_event_listener(
ptr_, "", &MyTimerCtx::wildcardTrampoline, raw);
if (id == 0) return ListenerHandle{0};
listeners_.emplace(id, std::move(owned));
return ListenerHandle{id};
}
bool removeEventListener(ListenerHandle handle) {
if (handle.id == 0) return false;
const auto rc = my_timer_remove_event_listener(ptr_, handle.id);
@ -797,55 +846,67 @@ public:
return rc == 0;
}
EchoResponse echo(const EchoRequest& req) const {
Result<EchoResponse> echo(const EchoRequest& req) const {
const auto ffi_req_ = MyTimerEchoReq{req};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<EchoResponse>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return my_timer_echo(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<EchoResponse>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<EchoResponse>::err(ffi_raw_.error());
return decodeCborFFI<EchoResponse>(ffi_raw_.value());
}
std::future<EchoResponse> echoAsync(const EchoRequest& req) const {
std::future<Result<EchoResponse>> echoAsync(const EchoRequest& req) const {
return std::async(std::launch::async, [this, req]() { return this->echo(req); });
}
std::string version() const {
Result<std::string> version() const {
const auto ffi_req_ = MyTimerVersionReq{};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<std::string>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return my_timer_version(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<std::string>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<std::string>::err(ffi_raw_.error());
return decodeCborFFI<std::string>(ffi_raw_.value());
}
std::future<std::string> versionAsync() const {
std::future<Result<std::string>> versionAsync() const {
return std::async(std::launch::async, [this]() { return this->version(); });
}
ComplexResponse complex(const ComplexRequest& req) const {
Result<ComplexResponse> complex(const ComplexRequest& req) const {
const auto ffi_req_ = MyTimerComplexReq{req};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<ComplexResponse>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return my_timer_complex(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<ComplexResponse>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<ComplexResponse>::err(ffi_raw_.error());
return decodeCborFFI<ComplexResponse>(ffi_raw_.value());
}
std::future<ComplexResponse> complexAsync(const ComplexRequest& req) const {
std::future<Result<ComplexResponse>> complexAsync(const ComplexRequest& req) const {
return std::async(std::launch::async, [this, req]() { return this->complex(req); });
}
ScheduleResult schedule(const JobSpec& job, const RetryPolicy& retry, const ScheduleConfig& schedule) const {
Result<ScheduleResult> schedule(const JobSpec& job, const RetryPolicy& retry, const ScheduleConfig& schedule) const {
const auto ffi_req_ = MyTimerScheduleReq{job, retry, schedule};
const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);
const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
auto ffi_enc_ = encodeCborFFI(ffi_req_);
if (ffi_enc_.isErr()) return Result<ScheduleResult>::err(ffi_enc_.error());
const auto& ffi_req_bytes_ = ffi_enc_.value();
auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {
return my_timer_schedule(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());
}, timeout_);
return decodeCborFFI<ScheduleResult>(ffi_raw_);
if (ffi_raw_.isErr()) return Result<ScheduleResult>::err(ffi_raw_.error());
return decodeCborFFI<ScheduleResult>(ffi_raw_.value());
}
std::future<ScheduleResult> scheduleAsync(const JobSpec& job, const RetryPolicy& retry, const ScheduleConfig& schedule) const {
std::future<Result<ScheduleResult>> scheduleAsync(const JobSpec& job, const RetryPolicy& retry, const ScheduleConfig& schedule) const {
return std::async(std::launch::async, [this, job, retry, schedule]() { return this->schedule(job, retry, schedule); });
}
@ -860,11 +921,6 @@ private:
explicit TypedListener(std::function<void(const T&)> f) : fn(std::move(f)) {}
};
struct WildcardListener : ListenerBase {
std::function<void(int, const std::string&, std::span<const std::uint8_t>)> fn;
explicit WildcardListener(std::function<void(int, const std::string&, std::span<const std::uint8_t>)> f) : fn(std::move(f)) {}
};
template <class T>
static void typedTrampoline(int ret, const char* msg, std::size_t len, void* ud) {
if (!ud || ret != 0 || !msg || len == 0) return;
@ -880,29 +936,6 @@ private:
listener->fn(payload);
}
static void wildcardTrampoline(int ret, const char* msg, std::size_t len, void* ud) {
if (!ud) return;
auto* listener = static_cast<WildcardListener*>(ud);
if (!listener->fn) return;
std::span<const std::uint8_t> envelope{};
if (msg && len > 0) {
envelope = std::span<const std::uint8_t>(reinterpret_cast<const std::uint8_t*>(msg), len);
}
std::string eventId;
if (ret == 0 && !envelope.empty()) {
CborParser parser; CborValue it;
if (cbor_parser_init(envelope.data(), envelope.size(), 0, &parser, &it) == CborNoError
&& cbor_value_is_map(&it)) {
CborValue evtField;
if (cbor_value_map_find_value(&it, "eventType", &evtField) == CborNoError
&& cbor_value_is_text_string(&evtField)) {
(void)decode_cbor(evtField, eventId);
}
}
}
listener->fn(ret, eventId, envelope);
}
void* ptr_;
std::chrono::milliseconds timeout_;
std::unordered_map<std::uint64_t, std::unique_ptr<ListenerBase>> listeners_;

View File

@ -84,6 +84,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "my_timer"
version = "0.1.0"
dependencies = [
"ciborium",
"flume",
"serde",
"tokio",
]
[[package]]
name = "pin-project-lite"
version = "0.2.17"
@ -164,16 +174,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "timer"
version = "0.1.0"
dependencies = [
"ciborium",
"flume",
"serde",
"tokio",
]
[[package]]
name = "tokio"
version = "1.52.3"
@ -181,6 +181,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe"
dependencies = [
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]

View File

@ -8,3 +8,6 @@ serde = { version = "1", features = ["derive"] }
ciborium = "0.2"
flume = { version = "0.11", default-features = false, features = ["async"] }
tokio = { version = "1", features = ["sync", "time"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time"] }

View File

@ -0,0 +1,53 @@
//! Synchronous example: exercises the library-event listener API
//! (typed + wildcard + remove).
//!
//! Run with: `cargo run --example main`
use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
use std::os::raw::c_int;
use std::sync::mpsc;
use std::time::Duration;
fn main() -> Result<(), String> {
let ctx = MyTimerCtx::create(
TimerConfig { name: "rust-sync-demo".into() },
Duration::from_secs(5),
)?;
// Typed listener: the closure is invoked on the lib's dispatch
// thread, so forward the payload to `main` via std mpsc and block
// on `recv_timeout` below. `add_on_echo_fired_listener` is generated
// per `{.ffiEvent.}`-declared proc and takes a typed `&EchoEvent`.
let (tx, rx) = mpsc::channel::<EchoEvent>();
let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| {
let _ = tx.send(evt.clone());
});
// Wildcard listener: receives every event with the FFI return code,
// the wire `event_id` pre-extracted from the CBOR envelope, and the
// raw envelope bytes. Lift to a typed payload via
// `decode_event_payload::<T>` when the event_id matches one you
// care about — this avoids hand-rolling ciborium calls per branch.
let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| {
println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len());
if ret == 0 && event_id == "on_echo_fired" {
match decode_event_payload::<EchoEvent>(envelope) {
Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count),
Err(e) => println!(" decode failed: {}", e),
}
}
});
// Trigger the event — fires `on_echo_fired` once, which the
// dispatch thread delivers to both listeners above.
ctx.echo(EchoRequest { message: "sync-event-demo".into(), delay_ms: 1 })?;
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(evt) => println!("typed onEchoFired: message={}, echo_count={}", evt.message, evt.echo_count),
Err(e) => return Err(format!("event never arrived: {}", e)),
}
ctx.remove_event_listener(typed_handle);
ctx.remove_event_listener(wildcard_handle);
Ok(())
}

View File

@ -0,0 +1,60 @@
//! Tokio (async) example: same shape as `main.rs` but exercises the
//! async `_async` API and bridges library events into a tokio-aware
//! channel for async consumption.
//!
//! Run with: `cargo run --example tokio_main`
use my_timer::{decode_event_payload, EchoEvent, EchoRequest, MyTimerCtx, TimerConfig};
use std::os::raw::c_int;
use std::time::Duration;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), String> {
let ctx = MyTimerCtx::new_async(
TimerConfig { name: "rust-tokio-demo".into() },
Duration::from_secs(5),
)
.await?;
// Typed listener: the handler fires on the lib's dispatch thread,
// which is *outside* the tokio runtime. Forwarding through a tokio
// `unbounded_channel` (Sender is Send + Sync, non-blocking) hands
// the event over to the runtime so we can `.await` it below.
let (typed_tx, mut typed_rx) = mpsc::unbounded_channel::<EchoEvent>();
let typed_handle = ctx.add_on_echo_fired_listener(move |evt: &EchoEvent| {
let _ = typed_tx.send(evt.clone());
});
// Wildcard listener: receives every event with the FFI return code,
// the wire `event_id` pre-extracted from the CBOR envelope, and the
// raw envelope bytes. Lift to a typed payload via
// `decode_event_payload::<T>` when the event_id matches one you
// care about — this avoids hand-rolling ciborium calls per branch.
let wildcard_handle = ctx.add_event_listener(|ret: c_int, event_id: &str, envelope: &[u8]| {
println!("wildcard: ret={}, event_id={}, bytes={}", ret, event_id, envelope.len());
if ret == 0 && event_id == "on_echo_fired" {
match decode_event_payload::<EchoEvent>(envelope) {
Ok(evt) => println!(" decoded: message={}, echo_count={}", evt.message, evt.echo_count),
Err(e) => println!(" decode failed: {}", e),
}
}
});
// Trigger an echo via the async API — fires `on_echo_fired` once,
// which the dispatch thread delivers to both listeners above.
ctx.echo_async(EchoRequest { message: "async-event-demo".into(), delay_ms: 1 })
.await?;
// Await the typed event with a bounded timeout so a missing event
// surfaces as an error instead of hanging the example forever.
let evt = tokio::time::timeout(Duration::from_secs(2), typed_rx.recv())
.await
.map_err(|_| "event never arrived".to_string())?
.ok_or_else(|| "typed channel closed".to_string())?;
println!("typed onEchoFired: message={}, echo_count={}", evt.message, evt.echo_count);
ctx.remove_event_listener(typed_handle);
ctx.remove_event_listener(wildcard_handle);
Ok(())
}

View File

@ -98,63 +98,33 @@ where
}
}
/// Typed event handlers for `MyTimerCtx`. Each field is `None` by
/// default; set the ones you care about and pass to
/// `MyTimerCtx::set_event_handlers`.
#[allow(non_snake_case)]
pub struct Events {
pub on_error: Option<Box<dyn Fn(&str) + Send + Sync>>,
pub onEchoFired: Option<Box<dyn Fn(&EchoEvent) + Send + Sync>>,
struct OnEchoFiredHandler {
f: Box<dyn Fn(&EchoEvent) + Send + Sync>,
}
impl Default for Events {
fn default() -> Self {
Self { on_error: None, onEchoFired: None }
}
}
unsafe extern "C" fn my_timer_event_trampoline(
unsafe extern "C" fn on_echo_fired_trampoline(
ret: c_int, msg: *const c_char, len: usize, ud: *mut c_void,
) {
if ud.is_null() { return; }
let events = &*(ud as *const Events);
if ret != 0 {
if let Some(ref on_err) = events.on_error {
let bytes = if !msg.is_null() && len > 0 {
slice::from_raw_parts(msg as *const u8, len)
} else { &[] };
let s = String::from_utf8_lossy(bytes);
on_err(&s);
}
if ud.is_null() || ret != 0 || msg.is_null() || len == 0 {
return;
}
if msg.is_null() || len == 0 { return; }
let h = &*(ud as *const OnEchoFiredHandler);
let bytes = slice::from_raw_parts(msg as *const u8, len);
#[derive(serde::Deserialize)]
struct EnvelopeMeta {
#[serde(rename = "eventType")]
event_type: String,
}
let meta: EnvelopeMeta = match ciborium::de::from_reader(bytes) {
Ok(m) => m,
Err(_) => return,
};
if meta.event_type == "on_echo_fired" {
#[derive(serde::Deserialize)]
struct Envelope { payload: EchoEvent }
if let Ok(env) = ciborium::de::from_reader::<Envelope, _>(bytes) {
if let Some(ref h) = events.onEchoFired { h(&env.payload); }
}
return;
struct Envelope { payload: EchoEvent }
if let Ok(env) = ciborium::de::from_reader::<Envelope, _>(bytes) {
(h.f)(&env.payload);
}
}
#[derive(Debug, Clone, Copy)]
pub struct ListenerHandle { pub id: u64 }
/// High-level context for `MyTimer`.
pub struct MyTimerCtx {
ptr: *mut c_void,
timeout: Duration,
events: *mut Events,
event_listener_id: u64,
listeners: std::sync::Mutex<std::collections::HashMap<u64, Box<dyn std::any::Any + Send>>>,
}
// SAFETY: The `ptr` field points to an FFIContext owned by the Nim runtime.
@ -174,10 +144,6 @@ impl Drop for MyTimerCtx {
unsafe { ffi::my_timer_destroy(self.ptr); }
self.ptr = std::ptr::null_mut();
}
if !self.events.is_null() {
unsafe { drop(Box::from_raw(self.events)); }
self.events = std::ptr::null_mut();
}
}
}
@ -191,7 +157,7 @@ impl MyTimerCtx {
})?;
let addr_str: String = decode_cbor(&raw_bytes)?;
let addr: usize = addr_str.parse().map_err(|e: std::num::ParseIntError| e.to_string())?;
Ok(Self { ptr: addr as *mut c_void, timeout, events: std::ptr::null_mut(), event_listener_id: 0 })
Ok(Self { ptr: addr as *mut c_void, timeout, listeners: std::sync::Mutex::new(std::collections::HashMap::new()) })
}
pub async fn new_async(config: TimerConfig, timeout: Duration) -> Result<Self, String> {
@ -203,30 +169,44 @@ impl MyTimerCtx {
}).await?;
let addr_str: String = decode_cbor(&raw_bytes)?;
let addr: usize = addr_str.parse().map_err(|e: std::num::ParseIntError| e.to_string())?;
Ok(Self { ptr: addr as *mut c_void, timeout, events: std::ptr::null_mut(), event_listener_id: 0 })
Ok(Self { ptr: addr as *mut c_void, timeout, listeners: std::sync::Mutex::new(std::collections::HashMap::new()) })
}
/// Attach typed event handlers. Each call removes any previous
/// listener via `_remove_event_listener` before adding the new
/// one, so the registry never holds a pointer into a freed box.
pub fn set_event_handlers(&mut self, handlers: Events) {
if self.event_listener_id != 0 {
unsafe {
let _ = ffi::my_timer_remove_event_listener(self.ptr, self.event_listener_id);
}
self.event_listener_id = 0;
}
if !self.events.is_null() {
unsafe { drop(Box::from_raw(self.events)); }
self.events = std::ptr::null_mut();
}
let raw = Box::into_raw(Box::new(handlers));
self.events = raw;
unsafe {
self.event_listener_id = ffi::my_timer_add_event_listener(
self.ptr, b"\0".as_ptr() as *const c_char,
my_timer_event_trampoline, raw as *mut c_void);
fn add_listener_inner(
&self,
event_name: *const c_char,
callback: ffi::FFICallback,
raw: *mut c_void,
owned: Box<dyn std::any::Any + Send>,
) -> ListenerHandle {
let id = unsafe {
ffi::my_timer_add_event_listener(self.ptr, event_name, callback, raw)
};
if id != 0 {
self.listeners.lock().unwrap().insert(id, owned);
}
ListenerHandle { id }
}
/// Register a typed listener for `on_echo_fired`. The returned handle can be
/// passed to `remove_event_listener` to unregister.
pub fn add_on_echo_fired_listener<F>(&self, handler: F) -> ListenerHandle
where F: Fn(&EchoEvent) + Send + Sync + 'static,
{
let owned: Box<OnEchoFiredHandler> = Box::new(OnEchoFiredHandler { f: Box::new(handler) });
let raw = &*owned as *const OnEchoFiredHandler as *mut c_void;
self.add_listener_inner(b"on_echo_fired\0".as_ptr() as *const c_char, on_echo_fired_trampoline, raw, owned)
}
/// Remove a previously-registered listener by handle. Returns true
/// if the listener existed and was removed; false otherwise.
pub fn remove_event_listener(&self, handle: ListenerHandle) -> bool {
if handle.id == 0 { return false; }
let rc = unsafe {
ffi::my_timer_remove_event_listener(self.ptr, handle.id)
};
self.listeners.lock().unwrap().remove(&handle.id);
rc == 0
}
pub fn echo(&self, req: EchoRequest) -> Result<EchoResponse, String> {

View File

@ -96,6 +96,16 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "my_timer"
version = "0.1.0"
dependencies = [
"ciborium",
"flume",
"serde",
"tokio",
]
[[package]]
name = "pin-project-lite"
version = "0.2.17"
@ -124,8 +134,8 @@ dependencies = [
name = "rust_client"
version = "0.1.0"
dependencies = [
"my_timer",
"serde_json",
"timer",
"tokio",
]
@ -198,16 +208,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "timer"
version = "0.1.0"
dependencies = [
"ciborium",
"flume",
"serde",
"tokio",
]
[[package]]
name = "tokio"
version = "1.52.1"

View File

@ -15,6 +15,7 @@ const CppPtrType* = "uint64_t"
## reflected in the generated bindings without touching this codegen.
const
HeaderPreludeTpl = staticRead("templates/cpp/header_prelude.hpp.tpl")
ResultTpl = staticRead("templates/cpp/result.hpp.tpl")
CborHelpersTpl = staticRead("templates/cpp/cbor_helpers.hpp.tpl")
SyncCallHelperTpl = staticRead("templates/cpp/sync_call_helper.hpp.tpl")
ContextRuleOf5Tpl = staticRead("templates/cpp/context_rule_of_5.hpp.tpl")
@ -144,10 +145,7 @@ proc emitEventDispatcher(
## per declared `{.ffiEvent.}`. Internally registers under the wire
## event name; the per-listener trampoline decodes the CBOR
## envelope's `payload` field as `T` and invokes the user handler.
## - `addEventListener(std::function<void(int, const std::string&)>) ->
## ListenerHandle` registers a catch-all wildcard listener
## (event_name == "") that receives every event as raw envelope
## bytes plus the FFI return code.
## Callers subscribe to each event separately.
## - `removeEventListener(ListenerHandle) -> bool` drops a listener by
## handle. After it returns true, no further callbacks for that id
## are in flight on the FFI side (the Nim-side registry lock plus
@ -188,30 +186,6 @@ proc emitEventDispatcher(
lines.add(" return ListenerHandle{id};")
lines.add(" }")
lines.add("")
# Generic wildcard registration.
#
# The handler receives the FFI return code, the wire `eventType` string
# extracted from the CBOR envelope (empty if the envelope is malformed
# or `ret != 0`), and a `std::span` view over the raw envelope bytes —
# the buffer is owned by the dylib and stays valid for the duration of
# the synchronous callback only. Pair with `decodeEventPayload<T>` to
# lift the payload into a typed value without hand-rolling CBOR parsing.
lines.add(
" ListenerHandle addEventListener(std::function<void(int, const std::string&, std::span<const std::uint8_t>)> handler) {"
)
lines.add(
" auto owned = std::make_unique<WildcardListener>(std::move(handler));"
)
lines.add(" auto* raw = owned.get();")
lines.add(" const auto id = $1_add_event_listener(" % [libName])
lines.add(
" ptr_, \"\", &$1::wildcardTrampoline, raw);" % [ctxTypeName]
)
lines.add(" if (id == 0) return ListenerHandle{0};")
lines.add(" listeners_.emplace(id, std::move(owned));")
lines.add(" return ListenerHandle{id};")
lines.add(" }")
lines.add("")
# Remove by handle.
lines.add(" bool removeEventListener(ListenerHandle handle) {")
lines.add(" if (handle.id == 0) return false;")
@ -230,16 +204,10 @@ proc emitEventTrampoline(
## `emitEventDispatcher`:
##
## - `ListenerBase` is a polymorphic base so the context's
## `listeners_` map can own typed and wildcard listeners under a
## single value type.
## `listeners_` map can own typed listeners under a single value type.
## - `TypedListener<T>` holds the user's `std::function<void(const T&)>`
## and is the target of `typedTrampoline<T>`, which CBOR-decodes the
## envelope's `payload` field as `T` and invokes the handler.
## - `WildcardListener` holds a `std::function<void(int, const
## std::string&, std::span<const std::uint8_t>)>` and is the target
## of `wildcardTrampoline`, which forwards the FFI return code plus
## a span view over the raw payload bytes (no copy — the dylib owns
## the buffer for the duration of the synchronous callback).
if events.len == 0:
return
lines.add(" struct ListenerBase {")
@ -252,15 +220,6 @@ proc emitEventTrampoline(
lines.add(" explicit TypedListener(std::function<void(const T&)> f) : fn(std::move(f)) {}")
lines.add(" };")
lines.add("")
lines.add(" struct WildcardListener : ListenerBase {")
lines.add(
" std::function<void(int, const std::string&, std::span<const std::uint8_t>)> fn;"
)
lines.add(
" explicit WildcardListener(std::function<void(int, const std::string&, std::span<const std::uint8_t>)> f) : fn(std::move(f)) {}"
)
lines.add(" };")
lines.add("")
# Typed trampoline — one instantiation per payload type, all sharing a body.
lines.add(" template <class T>")
lines.add(" static void typedTrampoline(int ret, const char* msg, std::size_t len, void* ud) {")
@ -283,45 +242,6 @@ proc emitEventTrampoline(
lines.add(" listener->fn(payload);")
lines.add(" }")
lines.add("")
# Wildcard trampoline — extracts `eventType` from the CBOR envelope so
# the user can match on the event name without hand-parsing. Falls back
# to an empty `eventId` if the envelope is missing, malformed, or the
# FFI return code signals an error (in which case the bytes are an
# error string, not a CBOR envelope).
lines.add(
" static void wildcardTrampoline(int ret, const char* msg, std::size_t len, void* ud) {"
)
lines.add(" if (!ud) return;")
lines.add(" auto* listener = static_cast<WildcardListener*>(ud);")
lines.add(" if (!listener->fn) return;")
# The buffer pointed to by `msg` is owned by the dylib and stays valid
# for the duration of this synchronous call — safe to hand to the user
# as a span view rather than copying.
lines.add(" std::span<const std::uint8_t> envelope{};")
lines.add(" if (msg && len > 0) {")
lines.add(
" envelope = std::span<const std::uint8_t>(reinterpret_cast<const std::uint8_t*>(msg), len);"
)
lines.add(" }")
lines.add(" std::string eventId;")
lines.add(" if (ret == 0 && !envelope.empty()) {")
lines.add(" CborParser parser; CborValue it;")
lines.add(
" if (cbor_parser_init(envelope.data(), envelope.size(), 0, &parser, &it) == CborNoError"
)
lines.add(" && cbor_value_is_map(&it)) {")
lines.add(" CborValue evtField;")
lines.add(
" if (cbor_value_map_find_value(&it, \"eventType\", &evtField) == CborNoError"
)
lines.add(" && cbor_value_is_text_string(&evtField)) {")
lines.add(" (void)decode_cbor(evtField, eventId);")
lines.add(" }")
lines.add(" }")
lines.add(" }")
lines.add(" listener->fn(ret, eventId, envelope);")
lines.add(" }")
lines.add("")
proc generateCppHeader*(
procs: seq[FFIProcMeta],
@ -334,10 +254,13 @@ proc generateCppHeader*(
lines.add(HeaderPreludeTpl)
if events.len > 0:
# Only pulled in when the library declares `{.ffiEvent.}` procs —
# `<unordered_map>` backs the `listeners_` map, `<span>` is the
# zero-copy view type handed to wildcard callbacks.
# `<unordered_map>` backs the `listeners_` map.
lines.add("#include <unordered_map>")
lines.add("#include <span>")
# Result<T> is the exception-free return channel used by every generated
# entry point. It must precede the CBOR helpers and sync-call helper below,
# which now hand their failures back as Result rather than throwing.
lines.add(ResultTpl)
# CBOR primitive / container helpers must precede the per-struct codecs
# below, because each emitted `encode_cbor`/`decode_cbor(T)` calls the
@ -431,33 +354,6 @@ proc generateCppHeader*(
lines.add(SyncCallHelperTpl)
# ── Event-payload decoder helper ──────────────────────────────────────────
# Lets wildcard-listener bodies lift the `payload` field out of a CBOR
# envelope into any registered event type with a single call, e.g.
# EchoEvent evt;
# if (decodeEventPayload(envelope, evt)) { ... }
# Relies on the per-struct `decode_cbor` codec emitted above.
if events.len > 0:
lines.add("template <class T>")
lines.add(
"inline bool decodeEventPayload(std::span<const std::uint8_t> envelope, T& out) {"
)
lines.add(" if (envelope.empty()) return false;")
lines.add(" CborParser parser; CborValue it;")
lines.add(
" if (cbor_parser_init(envelope.data(), envelope.size(), 0, &parser, &it) != CborNoError)"
)
lines.add(" return false;")
lines.add(" if (!cbor_value_is_map(&it)) return false;")
lines.add(" CborValue payloadField;")
lines.add(
" if (cbor_value_map_find_value(&it, \"payload\", &payloadField) != CborNoError)"
)
lines.add(" return false;")
lines.add(" return decode_cbor(payloadField, out) == CborNoError;")
lines.add("}")
lines.add("")
# ── High-level C++ context class ──────────────────────────────────────────
var ctors: seq[FFIProcMeta] = @[]
var methods: seq[FFIProcMeta] = @[]
@ -519,32 +415,43 @@ proc generateCppHeader*(
# context owns library threads, so we forbid copy/move on the class
# itself (see ContextRuleOf5Tpl) and hand out ownership through a
# smart pointer that callers can move, store in containers, etc.
let createRet = "Result<std::unique_ptr<$1>>" % [ctxTypeName]
lines.add(
" static std::unique_ptr<$1> create($2) {" %
[ctxTypeName, ctorParamsWithTimeout]
" static $1 create($2) {" % [createRet, ctorParamsWithTimeout]
)
lines.add(" const auto ffi_req_ = $1;" % [reqInit])
lines.add(" const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);")
lines.add(" const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {")
lines.add(" auto ffi_enc_ = encodeCborFFI(ffi_req_);")
lines.add(" if (ffi_enc_.isErr()) return $1::err(ffi_enc_.error());" % [createRet])
lines.add(" const auto& ffi_req_bytes_ = ffi_enc_.value();")
lines.add(" auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {")
lines.add(
" (void)$1(ffi_req_bytes_.data(), ffi_req_bytes_.size(), cb, ud);" %
[ctor.procName]
)
lines.add(" return 0;")
lines.add(" }, timeout);")
lines.add(" const auto addr_str = decodeCborFFI<std::string>(ffi_raw_);")
lines.add(" try {")
lines.add(" const auto addr = std::stoull(addr_str);")
# Use `new` directly (not std::make_unique) so the ctor can stay private.
lines.add(" if (ffi_raw_.isErr()) return $1::err(ffi_raw_.error());" % [createRet])
lines.add(" auto ffi_addr_ = decodeCborFFI<std::string>(ffi_raw_.value());")
lines.add(" if (ffi_addr_.isErr()) return $1::err(ffi_addr_.error());" % [createRet])
lines.add(" const auto& addr_str = ffi_addr_.value();")
# Parse the ctx address without exceptions: std::stoull would throw on a
# non-numeric payload, so use std::from_chars and surface the failure as
# an err() Result instead.
lines.add(" std::uint64_t addr = 0;")
lines.add(" const char* addr_begin = addr_str.data();")
lines.add(" const char* addr_end = addr_begin + addr_str.size();")
lines.add(" const auto fc_ = std::from_chars(addr_begin, addr_end, addr);")
lines.add(" if (fc_.ec != std::errc() || fc_.ptr != addr_end) {")
lines.add(
" return std::unique_ptr<$1>(new $1(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout));" %
[ctxTypeName]
)
lines.add(" } catch (const std::exception&) {")
lines.add(
" throw std::runtime_error(\"FFI create returned non-numeric address: \" + addr_str);"
" return $1::err(\"FFI create returned non-numeric address: \" + addr_str);" %
[createRet]
)
lines.add(" }")
# Use `new` directly (not std::make_unique) so the ctor can stay private.
lines.add(
" return $1::ok(std::unique_ptr<$2>(new $2(reinterpret_cast<void*>(static_cast<uintptr_t>(addr)), timeout)));" %
[createRet, ctxTypeName]
)
lines.add(" }")
lines.add("")
@ -559,7 +466,7 @@ proc generateCppHeader*(
else:
"timeout"
lines.add(
" static std::future<std::unique_ptr<$1>> createAsync($2) {" %
" static std::future<Result<std::unique_ptr<$1>>> createAsync($2) {" %
[ctxTypeName, ctorParamsWithTimeout]
)
lines.add(
@ -602,18 +509,22 @@ proc generateCppHeader*(
let reqInit = cppBracedInit(reqName, methParamNames)
let methRet = "Result<$1>" % [retCppType]
# Use a single-underscore-suffixed local for the Req envelope so it can't
# shadow a method parameter whose name happens to be `req` (or similar).
lines.add(" $1 $2($3) const {" % [retCppType, methodName, methParamsStr])
lines.add(" $1 $2($3) const {" % [methRet, methodName, methParamsStr])
lines.add(" const auto ffi_req_ = $1;" % [reqInit])
lines.add(" const auto ffi_req_bytes_ = encodeCborFFI(ffi_req_);")
lines.add(" const auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {")
lines.add(" auto ffi_enc_ = encodeCborFFI(ffi_req_);")
lines.add(" if (ffi_enc_.isErr()) return $1::err(ffi_enc_.error());" % [methRet])
lines.add(" const auto& ffi_req_bytes_ = ffi_enc_.value();")
lines.add(" auto ffi_raw_ = ffi_call_([&](FFICallback cb, void* ud) {")
lines.add(
" return $1(ptr_, cb, ud, ffi_req_bytes_.data(), ffi_req_bytes_.size());" %
[m.procName]
)
lines.add(" }, timeout_);")
lines.add(" return decodeCborFFI<$1>(ffi_raw_);" % [retCppType])
lines.add(" if (ffi_raw_.isErr()) return $1::err(ffi_raw_.error());" % [methRet])
lines.add(" return decodeCborFFI<$1>(ffi_raw_.value());" % [retCppType])
lines.add(" }")
lines.add("")
# The async wrapper calls the sync method via `this->methodName(...)` so
@ -623,7 +534,7 @@ proc generateCppHeader*(
if methParamsStr.len > 0:
lines.add(
" std::future<$1> $2Async($3) const {" %
[retCppType, methodName, methParamsStr]
[methRet, methodName, methParamsStr]
)
lines.add(
" return std::async(std::launch::async, [this, $1]() { return this->$2($3); });" %
@ -631,7 +542,7 @@ proc generateCppHeader*(
)
lines.add(" }")
else:
lines.add(" std::future<$1> $2Async() const {" % [retCppType, methodName])
lines.add(" std::future<$1> $2Async() const {" % [methRet, methodName])
lines.add(
" return std::async(std::launch::async, [this]() { return this->$1(); });" %
[methodName]
@ -640,13 +551,13 @@ proc generateCppHeader*(
lines.add("")
lines.add("private:")
# Listener machinery (`ListenerBase`, `TypedListener<T>`,
# `WildcardListener`, plus the static trampolines) must appear before
# the `listeners_` data member declaration — C++ requires the value
# type of a member to be complete at point of declaration. The public
# add*/remove methods above also reference these types, but member
# function bodies see the full class scope regardless of declaration
# order, so emitting here is sufficient for both.
# Listener machinery (`ListenerBase`, `TypedListener<T>`, plus the
# static trampolines) must appear before the `listeners_` data member
# declaration — C++ requires the value type of a member to be complete
# at point of declaration. The public add*/remove methods above also
# reference these types, but member function bodies see the full class
# scope regardless of declaration order, so emitting here is sufficient
# for both.
emitEventTrampoline(lines, events)
lines.add(" void* ptr_;")
lines.add(" std::chrono::milliseconds timeout_;")

View File

@ -74,6 +74,8 @@ proc generateCargoToml*(libName: string): string =
# pulling its async-std/futures shims.
# `tokio` is needed only for `tokio::time::timeout` around the async
# `recv_async`. Feature-gating tokio (item 11) is a follow-up commit.
# `[dev-dependencies]` lets the bundled `examples/` use `#[tokio::main]`
# without pulling those features into the library's runtime profile.
return
"""[package]
name = "$1"
@ -85,6 +87,9 @@ serde = { version = "1", features = ["derive"] }
ciborium = "0.2"
flume = { version = "0.11", default-features = false, features = ["async"] }
tokio = { version = "1", features = ["sync", "time"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time"] }
""" %
[libName]
@ -207,8 +212,8 @@ proc generateFFIRs*(procs: seq[FFIProcMeta]): string =
params.add("ctx: *mut c_void")
lines.add(" pub fn $1($2) -> c_int;" % [p.procName, params.join(", ")])
# Listener-registration ABI — emitted by `declareLibrary`, always
# present in the dylib.
# Listener-registration ABI — emitted on the Nim side by `declareLibrary`,
# always present in the dylib.
lines.add(
" pub fn $1_add_event_listener(ctx: *mut c_void, event_name: *const c_char, callback: FFICallback, user_data: *mut c_void) -> u64;" %
[linkLibName]
@ -446,78 +451,44 @@ proc generateApiRs*(
lines.add("}")
lines.add("")
# ── Typed event handler struct + trampoline (only if events declared) ────
# The Events struct holds optional boxed closures, one per registered
# `{.ffiEvent.}`. The struct lives on the heap (Box::into_raw); its raw
# pointer is handed to the dylib as `user_data` for the event callback.
# The trampoline parses the CBOR `EventEnvelope`, picks the matching
# field on Events, decodes the payload as the registered type, and
# invokes the closure.
# ── Per-listener handler boxes + extern "C" trampolines ─────────────────
# Each registered listener owns a `Box<…Handler>` that is kept alive in
# `$1::listeners` (keyed by listener id). The raw pointer to the inner
# handler is handed to the dylib as `user_data` for the per-event
# trampoline below.
if events.len > 0:
lines.add("/// Typed event handlers for `$1`. Each field is `None` by" % [ctxTypeName])
lines.add("/// default; set the ones you care about and pass to")
lines.add("/// `$1::set_event_handlers`." % [ctxTypeName])
lines.add("#[allow(non_snake_case)]")
lines.add("pub struct Events {")
lines.add(" pub on_error: Option<Box<dyn Fn(&str) + Send + Sync>>,")
for ev in events:
let handlerStruct = capitalizeFirstLetter(ev.nimProcName) & "Handler"
let trampolineName = camelToSnakeCase(ev.nimProcName) & "_trampoline"
lines.add("struct $1 {" % [handlerStruct])
lines.add(
" pub $1: Option<Box<dyn Fn(&$2) + Send + Sync>>," %
[ev.nimProcName, ev.payloadTypeName]
" f: Box<dyn Fn(&$1) + Send + Sync>," % [ev.payloadTypeName]
)
lines.add("}")
lines.add("")
lines.add("impl Default for Events {")
lines.add(" fn default() -> Self {")
lines.add(" Self { on_error: None, " &
events.mapIt(it.nimProcName & ": None").join(", ") & " }")
lines.add(" }")
lines.add("}")
lines.add("")
# Trampoline — `extern "C"` free function. Deserialises the envelope
# twice: once for `eventType`, then with the typed payload via serde.
lines.add("unsafe extern \"C\" fn $1_event_trampoline(" % [libName])
lines.add(" ret: c_int, msg: *const c_char, len: usize, ud: *mut c_void,")
lines.add(") {")
lines.add(" if ud.is_null() { return; }")
lines.add(" let events = &*(ud as *const Events);")
lines.add(" if ret != 0 {")
lines.add(" if let Some(ref on_err) = events.on_error {")
lines.add(" let bytes = if !msg.is_null() && len > 0 {")
lines.add(" slice::from_raw_parts(msg as *const u8, len)")
lines.add(" } else { &[] };")
lines.add(" let s = String::from_utf8_lossy(bytes);")
lines.add(" on_err(&s);")
lines.add(" }")
lines.add(" return;")
lines.add(" }")
lines.add(" if msg.is_null() || len == 0 { return; }")
lines.add(" let bytes = slice::from_raw_parts(msg as *const u8, len);")
lines.add(" #[derive(serde::Deserialize)]")
lines.add(" struct EnvelopeMeta {")
lines.add(" #[serde(rename = \"eventType\")]")
lines.add(" event_type: String,")
lines.add(" }")
lines.add(" let meta: EnvelopeMeta = match ciborium::de::from_reader(bytes) {")
lines.add(" Ok(m) => m,")
lines.add(" Err(_) => return,")
lines.add(" };")
for ev in events:
lines.add(" if meta.event_type == \"$1\" {" % [ev.wireName])
lines.add(" #[derive(serde::Deserialize)]")
lines.add(" struct Envelope { payload: $1 }" % [ev.payloadTypeName])
lines.add(
" if let Ok(env) = ciborium::de::from_reader::<Envelope, _>(bytes) {"
)
lines.add(
" if let Some(ref h) = events.$1 { h(&env.payload); }" %
[ev.nimProcName]
)
lines.add(" }")
lines.add("}")
lines.add("")
lines.add("unsafe extern \"C\" fn $1(" % [trampolineName])
lines.add(" ret: c_int, msg: *const c_char, len: usize, ud: *mut c_void,")
lines.add(") {")
lines.add(" if ud.is_null() || ret != 0 || msg.is_null() || len == 0 {")
lines.add(" return;")
lines.add(" }")
lines.add("}")
lines.add(" let h = &*(ud as *const $1);" % [handlerStruct])
lines.add(" let bytes = slice::from_raw_parts(msg as *const u8, len);")
lines.add(" #[derive(serde::Deserialize)]")
lines.add(
" struct Envelope { payload: $1 }" % [ev.payloadTypeName]
)
lines.add(
" if let Ok(env) = ciborium::de::from_reader::<Envelope, _>(bytes) {"
)
lines.add(" (h.f)(&env.payload);")
lines.add(" }")
lines.add("}")
lines.add("")
# Public handle returned by every add_…_listener call.
lines.add("#[derive(Debug, Clone, Copy)]")
lines.add("pub struct ListenerHandle { pub id: u64 }")
lines.add("")
# ── Context struct ─────────────────────────────────────────────────────────
@ -526,8 +497,14 @@ proc generateApiRs*(
lines.add(" ptr: *mut c_void,")
lines.add(" timeout: Duration,")
if events.len > 0:
lines.add(" events: *mut Events,")
lines.add(" event_listener_id: u64,")
# Keeps each registered handler box alive while its listener id is
# live on the Nim side. Removing an entry from the map drops the
# Box and frees the user's closure; the Nim-side registry has
# already guaranteed no callback for that id is in flight by the
# time `_remove_event_listener` returns.
lines.add(
" listeners: std::sync::Mutex<std::collections::HashMap<u64, Box<dyn std::any::Any + Send>>>,"
)
lines.add("}")
lines.add("")
# SAFETY block applies to both impls below (PR #23 Rust review, item 7).
@ -564,13 +541,9 @@ proc generateApiRs*(
lines.add(" unsafe { ffi::$1(self.ptr); }" % [dtorProcName])
lines.add(" self.ptr = std::ptr::null_mut();")
lines.add(" }")
# Reclaim the Events box after the dylib's destroy has torn down the
# FFI thread (no more events will fire by this point).
if events.len > 0:
lines.add(" if !self.events.is_null() {")
lines.add(" unsafe { drop(Box::from_raw(self.events)); }")
lines.add(" self.events = std::ptr::null_mut();")
lines.add(" }")
# `listeners` is dropped automatically after this body returns. By
# that point the dylib has joined its threads, so no callback is mid-
# flight against any of the raw pointers we handed it.
lines.add(" }")
lines.add("}")
lines.add("")
@ -627,7 +600,7 @@ proc generateApiRs*(
" let addr: usize = addr_str.parse().map_err(|e: std::num::ParseIntError| e.to_string())?;"
)
if events.len > 0:
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout, events: std::ptr::null_mut(), event_listener_id: 0 })")
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout, listeners: std::sync::Mutex::new(std::collections::HashMap::new()) })")
else:
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout })")
lines.add(" }")
@ -653,49 +626,89 @@ proc generateApiRs*(
" let addr: usize = addr_str.parse().map_err(|e: std::num::ParseIntError| e.to_string())?;"
)
if events.len > 0:
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout, events: std::ptr::null_mut(), event_listener_id: 0 })")
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout, listeners: std::sync::Mutex::new(std::collections::HashMap::new()) })")
else:
lines.add(" Ok(Self { ptr: addr as *mut c_void, timeout })")
lines.add(" }")
lines.add("")
# ── Typed event registration ───────────────────────────────────────────
# ── Listener-registration API ─────────────────────────────────────────
if events.len > 0:
# Private helper shared by every public `add_*_listener`: the
# FFI call + map insertion is identical across the typed event
# variants, so it lives in one place. The caller owns the box
# (typed as the concrete handler struct so the raw pointer matches
# the trampoline's expected type) and only erases it to
# `dyn Any + Send` when handing ownership over.
lines.add(" fn add_listener_inner(")
lines.add(" &self,")
lines.add(" event_name: *const c_char,")
lines.add(" callback: ffi::FFICallback,")
lines.add(" raw: *mut c_void,")
lines.add(" owned: Box<dyn std::any::Any + Send>,")
lines.add(" ) -> ListenerHandle {")
lines.add(" let id = unsafe {")
lines.add(
" /// Attach typed event handlers. Each call removes any previous"
)
lines.add(
" /// listener via `_remove_event_listener` before adding the new"
)
lines.add(
" /// one, so the registry never holds a pointer into a freed box."
)
lines.add(" pub fn set_event_handlers(&mut self, handlers: Events) {")
lines.add(" if self.event_listener_id != 0 {")
lines.add(" unsafe {")
lines.add(
" let _ = ffi::$1_remove_event_listener(self.ptr, self.event_listener_id);" %
" ffi::$1_add_event_listener(self.ptr, event_name, callback, raw)" %
[libName]
)
lines.add(" }")
lines.add(" self.event_listener_id = 0;")
lines.add(" };")
lines.add(" if id != 0 {")
lines.add(" self.listeners.lock().unwrap().insert(id, owned);")
lines.add(" }")
lines.add(" if !self.events.is_null() {")
lines.add(" unsafe { drop(Box::from_raw(self.events)); }")
lines.add(" self.events = std::ptr::null_mut();")
lines.add(" }")
lines.add(" let raw = Box::into_raw(Box::new(handlers));")
lines.add(" self.events = raw;")
lines.add(" unsafe {")
lines.add(" ListenerHandle { id }")
lines.add(" }")
lines.add("")
for ev in events:
let methodName = "add_" & camelToSnakeCase(ev.nimProcName) & "_listener"
let handlerStruct = capitalizeFirstLetter(ev.nimProcName) & "Handler"
let trampolineName = camelToSnakeCase(ev.nimProcName) & "_trampoline"
lines.add(
" /// Register a typed listener for `$1`. The returned handle can be" %
[ev.wireName]
)
lines.add(" /// passed to `remove_event_listener` to unregister.")
lines.add(
" pub fn $1<F>(&self, handler: F) -> ListenerHandle" % [methodName]
)
lines.add(
" where F: Fn(&$1) + Send + Sync + 'static," % [ev.payloadTypeName]
)
lines.add(" {")
lines.add(
" let owned: Box<$1> = Box::new($1 { f: Box::new(handler) });" %
[handlerStruct]
)
lines.add(" let raw = &*owned as *const $1 as *mut c_void;" %
[handlerStruct])
lines.add(
" self.add_listener_inner(b\"$1\\0\".as_ptr() as *const c_char, $2, raw, owned)" %
[ev.wireName, trampolineName]
)
lines.add(" }")
lines.add("")
# Remove by handle. Drops the Box (and the user's closure) after the
# C ABI confirms the listener has been unregistered.
lines.add(
" self.event_listener_id = ffi::$1_add_event_listener(" %
" /// Remove a previously-registered listener by handle. Returns true"
)
lines.add(
" /// if the listener existed and was removed; false otherwise."
)
lines.add(
" pub fn remove_event_listener(&self, handle: ListenerHandle) -> bool {"
)
lines.add(" if handle.id == 0 { return false; }")
lines.add(" let rc = unsafe {")
lines.add(
" ffi::$1_remove_event_listener(self.ptr, handle.id)" %
[libName]
)
lines.add(" self.ptr, b\"\\0\".as_ptr() as *const c_char,")
lines.add(
" $1_event_trampoline, raw as *mut c_void);" % [libName]
)
lines.add(" }")
lines.add(" };")
lines.add(" self.listeners.lock().unwrap().remove(&handle.id);")
lines.add(" rc == 0")
lines.add(" }")
lines.add("")

View File

@ -1,10 +1,8 @@
cmake_minimum_required(VERSION 3.14)
project({{LIB}}_cpp_bindings CXX C)
# The generated bindings target C++20. The event-listener API uses
# std::span<const std::uint8_t> on its wildcard callback to hand the
# CBOR envelope to consumers as a zero-copy view; <span> only became
# part of the standard library in C++20.
# The generated bindings target C++20: designated initializers and other
# C++20 constructs are used throughout the emitted code.
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@ -129,7 +129,7 @@ inline CborError decode_cbor(CborValue& it, std::optional<T>& out) {
// ── Public entry points ─────────────────────────────────────────────────
template<typename T>
inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
inline Result<std::vector<std::uint8_t>> encodeCborFFI(const T& value) {
// Start with a generous 4 KiB buffer; double on overflow until it fits.
std::vector<std::uint8_t> buf(4096);
while (true) {
@ -139,34 +139,34 @@ inline std::vector<std::uint8_t> encodeCborFFI(const T& value) {
if (err == CborNoError) {
const size_t used = cbor_encoder_get_buffer_size(&enc, buf.data());
buf.resize(used);
return buf;
return Result<std::vector<std::uint8_t>>::ok(std::move(buf));
}
if (err == CborErrorOutOfMemory) {
const size_t extra = cbor_encoder_get_extra_bytes_needed(&enc);
buf.resize(buf.size() + (extra > 0 ? extra : buf.size()));
continue;
}
throw std::runtime_error(std::string("FFI CBOR encode failed: ") +
cbor_error_string(err));
return Result<std::vector<std::uint8_t>>::err(
std::string("FFI CBOR encode failed: ") + cbor_error_string(err));
}
}
template<typename T>
inline T decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
inline Result<T> decodeCborFFI(const std::vector<std::uint8_t>& bytes) {
CborParser parser;
CborValue it;
CborError err = cbor_parser_init(bytes.data(), bytes.size(), 0, &parser, &it);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR parse init failed: ") +
cbor_error_string(err));
}
T out{};
err = decode_cbor(it, out);
if (err != CborNoError) {
throw std::runtime_error(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
return Result<T>::err(std::string("FFI CBOR decode failed: ") +
cbor_error_string(err));
}
return out;
return Result<T>::ok(std::move(out));
}
#endif // NIM_FFI_CBOR_HELPERS_HPP_INCLUDED

View File

@ -1,6 +1,6 @@
#pragma once
// Generated bindings require C++20 — the event-listener API uses
// std::span<const std::uint8_t> for the wildcard callback.
// Generated bindings require C++20 (designated initializers and other
// C++20 constructs are used throughout the emitted code).
// MSVC keeps __cplusplus at 199711L unless /Zc:__cplusplus is passed,
// so consult _MSVC_LANG when present (it always reflects the active
// /std:c++XX level).
@ -14,7 +14,7 @@
#include <string>
#include <cstdint>
#include <chrono>
#include <stdexcept>
#include <charconv>
#include <mutex>
#include <condition_variable>
#include <memory>
@ -24,6 +24,7 @@
#include <optional>
#include <type_traits>
#include <cstring>
#include <cassert>
extern "C" {
#include <tinycbor/cbor.h>
}

View File

@ -0,0 +1,61 @@
// ============================================================
// Result<T> — exception-free error channel
// ============================================================
// The generated bindings never throw: every fallible entry point (create,
// instance methods, and their *Async futures) returns a Result<T>. Callers
// branch on isOk()/isErr() (or the explicit bool conversion) and read
// value()/error(). This mirrors the Nim side's Result[T, string] and keeps
// us off C++23's std::expected.
#ifndef NIM_FFI_RESULT_HPP_INCLUDED
#define NIM_FFI_RESULT_HPP_INCLUDED
template <typename T>
class Result {
std::optional<T> value_;
std::string error_;
public:
static Result<T> ok(T value) {
Result<T> r;
r.value_ = std::move(value);
return r;
}
static Result<T> err(std::string message) {
Result<T> r;
r.error_ = std::move(message);
return r;
}
bool isOk() const { return value_.has_value(); }
bool isErr() const { return !value_.has_value(); }
explicit operator bool() const { return isOk(); }
const T& value() const { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
T& value() { assert(value_.has_value() && "Result::value() called on err Result — check isOk() first"); return *value_; }
const T& operator*() const { assert(value_.has_value() && "Result::operator*() called on err Result — check isOk() first"); return *value_; }
const T* operator->() const { assert(value_.has_value() && "Result::operator->() called on err Result — check isOk() first"); return &*value_; }
T&& take() { assert(value_.has_value() && "Result::take() called on err Result — check isOk() first"); return std::move(*value_); }
const std::string& error() const { assert(!value_.has_value() && "Result::error() called on ok Result — check isErr() first"); return error_; }
};
template <>
class Result<void> {
bool ok_ = true;
std::string error_;
public:
static Result<void> ok() {
Result<void> r;
r.ok_ = true;
return r;
}
static Result<void> err(std::string message) {
Result<void> r;
r.ok_ = false;
r.error_ = std::move(message);
return r;
}
Result() = default;
bool isOk() const { return ok_; }
bool isErr() const { return !ok_; }
explicit operator bool() const { return isOk(); }
const std::string& error() const { assert(!ok_ && "Result<void>::error() called on ok Result — check isErr() first"); return error_; }
};
#endif // NIM_FFI_RESULT_HPP_INCLUDED

View File

@ -34,22 +34,25 @@ inline void ffi_cb_(int ret, const char* msg, size_t len, void* ud) {
s.cv.notify_one();
}
inline std::vector<std::uint8_t> ffi_call_(std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
inline Result<std::vector<std::uint8_t>> ffi_call_(
std::function<int(FFICallback, void*)> f,
std::chrono::milliseconds timeout) {
using Bytes = std::vector<std::uint8_t>;
auto state = std::make_shared<FFICallState_>();
auto* cb_ref = new std::shared_ptr<FFICallState_>(state);
const int ret = f(ffi_cb_, cb_ref);
if (ret == 2) {
delete cb_ref;
throw std::runtime_error("RET_MISSING_CALLBACK (internal error)");
return Result<Bytes>::err("RET_MISSING_CALLBACK (internal error)");
}
std::unique_lock<std::mutex> lock(state->mtx);
const bool fired = state->cv.wait_for(lock, timeout, [&]{ return state->done; });
if (!fired)
throw std::runtime_error("FFI call timed out after " + std::to_string(timeout.count()) + "ms");
return Result<Bytes>::err("FFI call timed out after " +
std::to_string(timeout.count()) + "ms");
if (!state->ok)
throw std::runtime_error(state->err);
return state->bytes;
return Result<Bytes>::err(state->err);
return Result<Bytes>::ok(std::move(state->bytes));
}
} // anonymous namespace

View File

@ -147,6 +147,82 @@ proc sendRequestToFFIThread*(
## process proc.
return ok()
type Foo = object
registerReqFFI(WatchdogReq, foo: ptr Foo):
proc(): Future[Result[string, string]] {.async.} =
return ok("FFI thread is not blocked")
type JsonNotRespondingEvent = object
eventType: string
proc init(T: type JsonNotRespondingEvent): T =
return JsonNotRespondingEvent(eventType: "not_responding")
proc `$`(event: JsonNotRespondingEvent): string =
$(%*event)
proc onNotResponding*(ctx: ptr FFIContext) =
## Shim: still emits the legacy JSON payload through the registry, so
## existing foreign consumers see no wire-shape change. A follow-up
## PR replaces this with a CBOR `NotRespondingEvent`.
## Mirrors the dispatch templates' lock-during-invocation contract
## (see `ffi_events.nim`).
withLock ctx[].eventRegistry.lock:
let snap = ctx[].eventRegistry.byEvent.getOrDefault("onNotResponding")
if snap.len == 0:
chronicles.debug "onNotResponding - no listener registered"
return
foreignThreadGc:
let event = $JsonNotRespondingEvent.init()
for listener in snap:
listener.callback(
RET_OK,
cast[ptr cchar](unsafeAddr event[0]),
cast[csize_t](len(event)),
listener.userData,
)
proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs.
## This thread never blocks.
let watchdogRun = proc(ctx: ptr FFIContext) {.async.} =
const WatchdogStartDelay = 10.seconds
const WatchdogTimeinterval = 1.seconds
const WatchdogTimeout = 20.seconds
# Give time for the node to be created and up before sending watchdog requests
let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay)
if initialStop or ctx.running.load == false:
return
while true:
let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval)
if intervalStop or ctx.running.load == false:
debug "Watchdog thread exiting because FFIContext is not running"
break
let callback = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard ## Don't do anything. Just respecting the callback signature.
const nilUserData = nil
trace "Sending watchdog request to FFI thread"
try:
sendRequestToFFIThread(
ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout
).isOkOr:
error "Failed to send watchdog request to FFI thread", error = $error
onNotResponding(ctx)
except Exception as exc:
error "Exception sending watchdog request", exc = exc.msg
onNotResponding(ctx)
waitFor watchdogRun(ctx)
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =

View File

@ -7,7 +7,7 @@
{.pragma: callback, cdecl, raises: [], gcsafe.}
import system/ansi_c
import std/[atomics, locks, options, tables]
import std/[atomics, locks, sequtils, options, tables]
import chronicles
import ./ffi_types, ./cbor_serial
@ -39,10 +39,6 @@ type
lock*: Lock
nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1.
byEvent*: Table[string, seq[FFIEventListener]]
wildcard*: seq[FFIEventListener]
const WildcardEventName* = ""
## Empty string registers a wildcard listener that receives every event.
# ---------------------------------------------------------------------------
# Registry lifecycle and mutation
@ -56,7 +52,6 @@ proc initEventRegistry*(reg: var FFIEventRegistry) =
reg.lock.initLock()
reg.nextId = 0'u64
reg.byEvent = initTable[string, seq[FFIEventListener]]()
reg.wildcard.setLen(0)
proc deinitEventRegistry*(reg: var FFIEventRegistry) =
## Mirror of `initEventRegistry`: must be called exactly once, by the
@ -70,7 +65,6 @@ proc deinitEventRegistry*(reg: var FFIEventRegistry) =
## assignment destructor against this thread's heap allocations.
reg.lock.deinitLock()
reg.byEvent = default(Table[string, seq[FFIEventListener]])
reg.wildcard = @[]
reg.nextId = 0'u64
proc addEventListener*(
@ -80,9 +74,10 @@ proc addEventListener*(
userData: pointer,
): uint64 {.raises: [].} =
## Registers `callback` for `eventName` and returns the listener's stable
## id (always non-zero on success). `eventName == ""` registers a wildcard
## listener that receives every dispatched event. Returns 0 if `callback`
## is nil — the only documented failure mode.
## id (always non-zero on success). A listener only receives events
## dispatched under its own `eventName` — subscribe to each event
## separately. Returns 0 if `callback` is nil — the only documented
## failure mode.
if callback.isNil():
return 0
@ -93,10 +88,7 @@ proc addEventListener*(
assigned = reg.nextId
let listener =
FFIEventListener(id: assigned, callback: callback, userData: userData)
if eventName.len == 0:
reg.wildcard.add(listener)
else:
reg.byEvent.mgetOrPut(eventName, @[]).add(listener)
reg.byEvent.mgetOrPut(eventName, @[]).add(listener)
return assigned
proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} =
@ -110,54 +102,41 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises:
var removed = false
withLock reg.lock:
for i in 0 ..< reg.wildcard.len:
if reg.wildcard[i].id == id:
reg.wildcard.delete(i)
var
pruneKey = ""
prune = false
for key, listeners in reg.byEvent.mpairs:
let before = listeners.len
listeners.keepItIf(it.id != id)
if listeners.len < before:
removed = true
if listeners.len == 0:
pruneKey = key
prune = true
break
if not removed:
var emptyKey = ""
var prune = false
for key, listeners in reg.byEvent.mpairs:
var idx = -1
for i in 0 ..< listeners.len:
if listeners[i].id == id:
idx = i
break
if idx >= 0:
listeners.delete(idx)
removed = true
if listeners.len == 0:
emptyKey = key
prune = true
break
if prune:
reg.byEvent.del(emptyKey)
if prune:
reg.byEvent.del(pruneKey)
return removed
proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} =
## Drops every registered listener (per-event and wildcard). Does not
## reset the listener-id counter — subsequent `addEventListener` calls
## still return strictly increasing ids.
## Drops every registered listener. Does not reset the listener-id
## counter — subsequent `addEventListener` calls still return strictly
## increasing ids.
withLock reg.lock:
reg.wildcard.setLen(0)
reg.byEvent.clear()
proc snapshotListeners*(
reg: var FFIEventRegistry, eventName: string
): seq[FFIEventListener] {.raises: [].} =
## Returns a copy of the listener slice for `eventName`, plus every
## wildcard listener. The copy is what makes re-entrant add/remove from
## inside a handler deadlock-free: dispatch holds the lock only for the
## duration of the copy, then iterates the copy outside the lock.
## Returns a copy of the listener slice for `eventName`. The copy is what
## makes re-entrant add/remove from inside a handler deadlock-free:
## dispatch holds the lock only for the duration of the copy, then
## iterates the copy outside the lock.
var snap: seq[FFIEventListener] = @[]
withLock reg.lock:
if eventName.len > 0:
# `getOrDefault` returns an empty seq when the key is absent —
# avoids the raising `[]` operator path.
for l in reg.byEvent.getOrDefault(eventName):
snap.add(l)
for l in reg.wildcard:
# `getOrDefault` returns an empty seq when the key is absent —
# avoids the raising `[]` operator path.
for l in reg.byEvent.getOrDefault(eventName):
snap.add(l)
return snap
@ -276,8 +255,8 @@ template withFFIEventDispatch(eventName: string, listeners, body: untyped) =
)
template dispatchFFIEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to every listener for `eventName` plus every
## wildcard listener. `body` must yield a `string` or `seq[byte]`.
## Dispatches an FFI event to every listener subscribed to `eventName`.
## `body` must yield a `string` or `seq[byte]`.
##
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is
## set). Holds `reg.lock` for the entire snapshot + invocation so a

View File

@ -112,8 +112,9 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped =
## ABI on its `FFIContext`:
##
## - `{libraryName}_add_event_listener(ctx, event_name, cb, ud) -> uint64`
## — registers `cb` for `event_name` and returns its stable id. An
## empty `event_name` subscribes `cb` to *every* event (catch-all).
## — registers `cb` for `event_name` and returns its stable id. `cb`
## only receives events dispatched under `event_name`; subscribe to
## each event separately.
## - `{libraryName}_remove_event_listener(ctx, id) -> cint` — returns 0 on
## success, non-zero if no listener with that id exists.
##

View File

@ -7,6 +7,11 @@
// genbindings_cpp` is callable end-to-end from C++.
// The CrossLibrary test also loads `examples/echo/cpp_bindings` to prove
// two nim-ffi libraries can coexist in one process.
//
// The generated bindings never throw: every call returns a Result<T>. The
// `mustOk` helper below unwraps a Result and fails the test (without
// aborting) when it carries an error, so single-threaded tests read as if
// the value came back directly.
#include "my_timer.hpp"
#include "echo.hpp"
@ -23,8 +28,20 @@
namespace {
// Unwrap a Result<T> in a single-threaded test context. On error it records a
// non-fatal gtest failure and returns a default-constructed T so the caller
// can keep going (subsequent expectations will fail loudly).
template <typename T>
T mustOk(Result<T> r) {
if (r.isErr()) {
ADD_FAILURE() << "unexpected FFI error: " << r.error() << " line: " << __LINE__;
return T{};
}
return r.take();
}
std::unique_ptr<MyTimerCtx> makeCtx(const std::string& name = "e2e") {
return MyTimerCtx::create(TimerConfig{name});
return mustOk(MyTimerCtx::create(TimerConfig{name}));
}
} // namespace
@ -38,19 +55,19 @@ TEST(TimerE2E, CreateAndDestroy) {
TEST(TimerE2E, VersionSync) {
auto ctx = makeCtx("version-sync");
const auto v = ctx->version();
const auto v = mustOk(ctx->version());
EXPECT_EQ(v, "nim-timer v0.1.0");
}
TEST(TimerE2E, VersionAsync) {
auto ctx = makeCtx("version-async");
auto fut = ctx->versionAsync();
EXPECT_EQ(fut.get(), "nim-timer v0.1.0");
EXPECT_EQ(mustOk(fut.get()), "nim-timer v0.1.0");
}
TEST(TimerE2E, EchoRoundTripsMessageAndTimerName) {
auto ctx = makeCtx("echo-ctx");
const auto resp = ctx->echo(EchoRequest{"hello", 10});
const auto resp = mustOk(ctx->echo(EchoRequest{"hello", 10}));
EXPECT_EQ(resp.echoed, "hello");
EXPECT_EQ(resp.timerName, "echo-ctx");
}
@ -60,7 +77,7 @@ TEST(TimerE2E, EchoHonoursDelay) {
constexpr int delayMs = 150;
const auto start = std::chrono::steady_clock::now();
const auto resp = ctx->echo(EchoRequest{"waited", delayMs});
const auto resp = mustOk(ctx->echo(EchoRequest{"waited", delayMs}));
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count();
@ -76,9 +93,9 @@ TEST(TimerE2E, ConcurrentAsyncCallsAreIndependent) {
auto f2 = ctx->echoAsync(EchoRequest{"two", 40});
auto f3 = ctx->echoAsync(EchoRequest{"three", 20});
const auto r3 = f3.get();
const auto r2 = f2.get();
const auto r1 = f1.get();
const auto r3 = mustOk(f3.get());
const auto r2 = mustOk(f2.get());
const auto r1 = mustOk(f1.get());
EXPECT_EQ(r1.echoed, "one");
EXPECT_EQ(r2.echoed, "two");
@ -97,7 +114,7 @@ TEST(TimerE2E, ComplexWithOptionalNotePresent) {
std::optional<int64_t>(2),
};
const auto resp = ctx->complex(req);
const auto resp = mustOk(ctx->complex(req));
EXPECT_EQ(resp.itemCount, 2);
EXPECT_TRUE(resp.hasNote);
EXPECT_NE(resp.summary.find("note=a note"), std::string::npos)
@ -115,7 +132,7 @@ TEST(TimerE2E, ComplexWithOptionalNoteAbsent) {
std::nullopt,
};
const auto resp = ctx->complex(req);
const auto resp = mustOk(ctx->complex(req));
EXPECT_EQ(resp.itemCount, 0);
EXPECT_FALSE(resp.hasNote);
EXPECT_NE(resp.summary.find("note=<none>"), std::string::npos)
@ -128,8 +145,8 @@ TEST(TimerE2E, IndependentContextsKeepTheirOwnState) {
auto ctxA = makeCtx("alpha");
auto ctxB = makeCtx("beta");
const auto rA = ctxA->echo(EchoRequest{"x", 5});
const auto rB = ctxB->echo(EchoRequest{"x", 5});
const auto rA = mustOk(ctxA->echo(EchoRequest{"x", 5}));
const auto rB = mustOk(ctxB->echo(EchoRequest{"x", 5}));
EXPECT_EQ(rA.timerName, "alpha");
EXPECT_EQ(rB.timerName, "beta");
@ -137,8 +154,8 @@ TEST(TimerE2E, IndependentContextsKeepTheirOwnState) {
// N contexts keep independent state; an error on one must not poison siblings.
// Empty JobSpec.name is the chosen error trigger: schedule() returns
// err("job name must not be empty"), which the bindings rethrow as
// std::runtime_error carrying the exact string.
// err("job name must not be empty"), which the bindings surface as an
// err() Result carrying the exact string.
TEST(TimerE2E, MultiContextIsolation) {
constexpr int kCtxCount = 5;
std::vector<std::unique_ptr<MyTimerCtx>> ctxs;
@ -148,7 +165,7 @@ TEST(TimerE2E, MultiContextIsolation) {
}
for (int i = 0; i < kCtxCount; ++i) {
const auto resp = ctxs[i]->echo(EchoRequest{"ping", 0});
const auto resp = mustOk(ctxs[i]->echo(EchoRequest{"ping", 0}));
EXPECT_EQ(resp.echoed, "ping");
EXPECT_EQ(resp.timerName, "iso-" + std::to_string(i));
}
@ -156,20 +173,17 @@ TEST(TimerE2E, MultiContextIsolation) {
const auto bad = JobSpec{/*name*/ "", /*payload*/ {}, /*priority*/ 0};
const auto retry = RetryPolicy{1, 10, {}};
const auto sched = ScheduleConfig{0, 0, std::nullopt};
try {
(void)ctxs[2]->schedule(bad, retry, sched);
FAIL() << "expected schedule() to throw on empty job name";
} catch (const std::runtime_error& ex) {
EXPECT_STREQ(ex.what(), "job name must not be empty");
}
const auto scheduleRes = ctxs[2]->schedule(bad, retry, sched);
ASSERT_TRUE(scheduleRes.isErr()) << "expected schedule() to fail on empty job name";
EXPECT_EQ(scheduleRes.error(), "job name must not be empty");
const auto recovered = ctxs[2]->echo(EchoRequest{"after-err", 0});
const auto recovered = mustOk(ctxs[2]->echo(EchoRequest{"after-err", 0}));
EXPECT_EQ(recovered.echoed, "after-err");
EXPECT_EQ(recovered.timerName, "iso-2");
for (int i = 0; i < kCtxCount; ++i) {
if (i == 2) continue;
const auto resp = ctxs[i]->echo(EchoRequest{"still-here", 0});
const auto resp = mustOk(ctxs[i]->echo(EchoRequest{"still-here", 0}));
EXPECT_EQ(resp.echoed, "still-here");
EXPECT_EQ(resp.timerName, "iso-" + std::to_string(i));
}
@ -177,31 +191,31 @@ TEST(TimerE2E, MultiContextIsolation) {
// Two nim-ffi libraries in one process must not share state or symbols.
TEST(TimerE2E, CrossLibrary) {
auto timerCtx = MyTimerCtx::create(TimerConfig{"x-timer"});
auto echoCtx = EchoCtx::create(EchoConfig{"X-ECHO"});
auto timerCtx = mustOk(MyTimerCtx::create(TimerConfig{"x-timer"}));
auto echoCtx = mustOk(EchoCtx::create(EchoConfig{"X-ECHO"}));
EXPECT_EQ(timerCtx->version(), "nim-timer v0.1.0");
EXPECT_EQ(echoCtx->version(), "nim-echo v0.1.0");
EXPECT_EQ(mustOk(timerCtx->version()), "nim-timer v0.1.0");
EXPECT_EQ(mustOk(echoCtx->version()), "nim-echo v0.1.0");
const auto timerResp = timerCtx->echo(EchoRequest{"hello", 0});
const auto timerResp = mustOk(timerCtx->echo(EchoRequest{"hello", 0}));
EXPECT_EQ(timerResp.echoed, "hello");
EXPECT_EQ(timerResp.timerName, "x-timer");
const auto echoResp = echoCtx->shout(ShoutRequest{"hello"});
const auto echoResp = mustOk(echoCtx->shout(ShoutRequest{"hello"}));
EXPECT_EQ(echoResp.shouted, "X-ECHO: HELLO");
EXPECT_EQ(echoResp.prefix, "X-ECHO");
for (int i = 0; i < 4; ++i) {
const auto t = timerCtx->echo(EchoRequest{"t" + std::to_string(i), 0});
const auto e = echoCtx->shout(ShoutRequest{"e" + std::to_string(i)});
const auto t = mustOk(timerCtx->echo(EchoRequest{"t" + std::to_string(i), 0}));
const auto e = mustOk(echoCtx->shout(ShoutRequest{"e" + std::to_string(i)}));
EXPECT_EQ(t.timerName, "x-timer");
EXPECT_EQ(e.prefix, "X-ECHO");
}
auto tFut = timerCtx->echoAsync(EchoRequest{"async-t", 30});
auto eFut = echoCtx->shoutAsync(ShoutRequest{"async-e"});
const auto t = tFut.get();
const auto e = eFut.get();
const auto t = mustOk(tFut.get());
const auto e = mustOk(eFut.get());
EXPECT_EQ(t.echoed, "async-t");
EXPECT_EQ(t.timerName, "x-timer");
EXPECT_EQ(e.shouted, "X-ECHO: ASYNC-E");
@ -212,9 +226,9 @@ TEST(TimerE2E, TriplePipeline) {
auto ctx = makeCtx("pipeline");
auto pipeline = std::async(std::launch::async, [&ctx]() {
auto a = ctx->echoAsync(EchoRequest{"A", 20}).get();
auto b = ctx->echoAsync(EchoRequest{a.echoed + "->B", 10}).get();
auto c = ctx->echoAsync(EchoRequest{b.echoed + "->C", 5}).get();
auto a = mustOk(ctx->echoAsync(EchoRequest{"A", 20}).get());
auto b = mustOk(ctx->echoAsync(EchoRequest{a.echoed + "->B", 10}).get());
auto c = mustOk(ctx->echoAsync(EchoRequest{b.echoed + "->C", 5}).get());
return c;
});
@ -224,6 +238,8 @@ TEST(TimerE2E, TriplePipeline) {
}
// Per-thread context create -> one call -> destroy churns the FFI context pool.
// Worker threads avoid gtest assertion macros (not thread-safe) and report via
// the atomic `errors` counter instead.
TEST(TimerE2E, StressShortLivedPerThreadContext) {
constexpr int kThreads = 16;
@ -233,14 +249,13 @@ TEST(TimerE2E, StressShortLivedPerThreadContext) {
for (int t = 0; t < kThreads; ++t) {
workers.emplace_back([&, t] {
try {
auto ctx = makeCtx("short-" + std::to_string(t));
const auto resp = ctx->echo(EchoRequest{"hi", 0});
if (resp.echoed != "hi") ++errors;
if (resp.timerName != "short-" + std::to_string(t)) ++errors;
} catch (const std::exception&) {
++errors;
}
auto ctxRes = MyTimerCtx::create(TimerConfig{"short-" + std::to_string(t)});
if (ctxRes.isErr()) { ++errors; return; }
auto ctx = std::move(ctxRes.value());
const auto resp = ctx->echo(EchoRequest{"hi", 0});
if (resp.isErr()) { ++errors; return; }
if (resp->echoed != "hi") ++errors;
if (resp->timerName != "short-" + std::to_string(t)) ++errors;
});
}
for (auto& w : workers) w.join();
@ -259,13 +274,10 @@ TEST(TimerE2E, StressShortLivedSharedContext) {
for (int t = 0; t < kThreads; ++t) {
workers.emplace_back([&, t] {
try {
const auto resp = shared->echo(EchoRequest{"x" + std::to_string(t), 0});
if (resp.echoed != "x" + std::to_string(t)) ++errors;
if (resp.timerName != "shared-short") ++errors;
} catch (const std::exception&) {
++errors;
}
const auto resp = shared->echo(EchoRequest{"x" + std::to_string(t), 0});
if (resp.isErr()) { ++errors; return; }
if (resp->echoed != "x" + std::to_string(t)) ++errors;
if (resp->timerName != "shared-short") ++errors;
});
}
for (auto& w : workers) w.join();
@ -289,14 +301,17 @@ TEST(TimerE2E, ThreadedHammer) {
for (int t = 0; t < kThreads; ++t) {
workers.emplace_back([&, t] {
auto own = makeCtx("hammer-t" + std::to_string(t));
auto ownRes = MyTimerCtx::create(TimerConfig{"hammer-t" + std::to_string(t)});
if (ownRes.isErr()) { ++errors; return; }
auto own = std::move(ownRes.value());
for (int i = 0; i < kIters; ++i) {
if ((i & 1) == 0) {
const auto r = shared->echo(EchoRequest{"s", 0});
if (r.echoed != "s") ++errors;
if (r.isErr() || r->echoed != "s") ++errors;
} else {
auto f = own->echoAsync(EchoRequest{"a", 1});
if (f.get().echoed != "a") ++errors;
const auto r = f.get();
if (r.isErr() || r->echoed != "a") ++errors;
}
}
});
@ -322,7 +337,7 @@ TEST(TimerE2E, TypedEventFiresAfterEcho) {
[&](const EchoEvent& evt) { evtPromise.set_value(evt); });
ASSERT_NE(handle.id, 0u) << "addOnEchoFiredListener returned zero id";
const auto resp = ctx->echo(EchoRequest{"event-msg", 1});
const auto resp = mustOk(ctx->echo(EchoRequest{"event-msg", 1}));
EXPECT_EQ(resp.echoed, "event-msg");
const auto status = evtFuture.wait_for(std::chrono::seconds(2));
@ -389,54 +404,3 @@ TEST(TimerE2E, RemoveEventListenerStopsDelivery) {
EXPECT_EQ(keptHits.load(), 2);
EXPECT_EQ(removedHits.load(), 1) << "removed listener fired after removeEventListener";
}
// The wildcard `addEventListener` overload receives every event with the
// wire `eventId` pre-extracted plus a `std::span` view over the raw
// envelope bytes. The helper `decodeEventPayload<T>` lifts the payload
// into a typed value.
TEST(TimerE2E, WildcardListenerReceivesEventIdAndDecodesPayload) {
auto ctx = makeCtx("wildcard");
struct Capture {
int retCode;
std::string eventId;
std::size_t envelopeBytes;
std::optional<EchoEvent> decoded;
};
std::mutex mu;
std::vector<Capture> captured;
auto handle = ctx->addEventListener(
[&](int retCode, const std::string& eventId,
std::span<const std::uint8_t> envelope) {
Capture c{retCode, eventId, envelope.size(), std::nullopt};
if (retCode == 0 && eventId == "on_echo_fired") {
EchoEvent evt{};
if (decodeEventPayload(envelope, evt)) {
c.decoded = evt;
}
}
std::lock_guard<std::mutex> lock(mu);
captured.push_back(std::move(c));
});
ASSERT_NE(handle.id, 0u);
ctx->echo(EchoRequest{"hello", 1});
for (int i = 0; i < 200; ++i) {
{
std::lock_guard<std::mutex> lock(mu);
if (!captured.empty()) break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
std::lock_guard<std::mutex> lock(mu);
ASSERT_GE(captured.size(), 1u);
EXPECT_EQ(captured.front().retCode, 0);
EXPECT_EQ(captured.front().eventId, "on_echo_fired");
EXPECT_GT(captured.front().envelopeBytes, 0u);
ASSERT_TRUE(captured.front().decoded.has_value());
EXPECT_EQ(captured.front().decoded->message, "hello");
EXPECT_EQ(captured.front().decoded->echoCount, 1);
}

View File

@ -86,8 +86,8 @@ registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib):
return ok("emitted")
## Setter-thread worker for the registry race regression test. Each
## iteration adds then immediately removes a wildcard listener so a
## TSan-instrumented build can confirm `FFIEventRegistry.lock`
## iteration adds then immediately removes a listener for the dispatched
## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock`
## serialises the cross-thread mutation against dispatch-time
## `snapshotListeners` reads from the FFI thread.
type SetterArgs = tuple
@ -98,7 +98,7 @@ type SetterArgs = tuple
proc setterThreadBody(args: SetterArgs) {.thread.} =
while not args.stop[].load():
let id = addEventListener(
args.ctx[].eventRegistry, WildcardEventName, captureCb, args.target
args.ctx[].eventRegistry, "message_sent", captureCb, args.target
)
discard removeEventListener(args.ctx[].eventRegistry, id)
@ -116,10 +116,9 @@ suite "dispatchFFIEventCbor":
defer:
deinitCallbackData(evt)
# Register the event callback via the same locked helper that the
# codegen-emitted `{libname}_set_event_callback` uses.
# Subscribe to the specific event the request below dispatches.
discard addEventListener(
ctx[].eventRegistry, WildcardEventName, captureCb, addr evt
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
# Trigger the dispatch from the FFI thread; the response callback is
@ -159,7 +158,7 @@ suite "dispatchFFIEvent with seq[byte]":
deinitCallbackData(evt)
discard addEventListener(
ctx[].eventRegistry, WildcardEventName, captureCb, addr evt
ctx[].eventRegistry, "raw_bytes", captureCb, addr evt
)
var rsp: CallbackData
@ -178,8 +177,8 @@ suite "dispatchFFIEvent with seq[byte]":
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
when not defined(gcRefc):
## Skipped under `--mm:refc`: each setter thread grows / shrinks
## `reg.wildcard` (a `seq[FFIEventListener]`) via `addEventListener`,
## Skipped under `--mm:refc`: each setter thread grows / shrinks the
## per-event listener `seq[FFIEventListener]` via `addEventListener`,
## and refc's per-thread GC heap ownership makes cross-thread seq
## buffer reallocation unsafe even when the surrounding lock is held.
## ORC + the FFI thread + tsan (the combo this test was written for)
@ -206,7 +205,7 @@ when not defined(gcRefc):
# (callback, userData) pair — what matters is the cross-thread write
# racing the FFI thread's read, not which pair "wins".
discard addEventListener(
ctx[].eventRegistry, WildcardEventName, captureCb, addr evt
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
const NumSetterThreads = 4

View File

@ -68,7 +68,7 @@ suite "FFIEventRegistry mutation":
let id1 = addEventListener(reg, "evt", tagCb, addr t)
let id2 = addEventListener(reg, "evt", tagCb, addr t)
let id3 = addEventListener(reg, "", tagCb, addr t)
let id3 = addEventListener(reg, "other", tagCb, addr t)
check id1 == 1'u64
check id2 == 2'u64
check id3 == 3'u64
@ -89,7 +89,7 @@ suite "FFIEventRegistry mutation":
check not removeEventListener(reg, 0'u64)
check not removeEventListener(reg, 99'u64)
test "removeEventListener removes from per-event seq and wildcard":
test "removeEventListener removes listeners across distinct events":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
@ -101,18 +101,18 @@ suite "FFIEventRegistry mutation":
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
let id2 = addEventListener(reg, "", tagCb, addr t)
let id2 = addEventListener(reg, "other", tagCb, addr t)
check removeEventListener(reg, id1)
check removeEventListener(reg, id2)
# Second remove of the same id is a no-op.
check not removeEventListener(reg, id1)
let snap = snapshotListeners(reg, "evt")
check snap.len == 0
check snapshotListeners(reg, "evt").len == 0
check snapshotListeners(reg, "other").len == 0
suite "FFIEventRegistry snapshot semantics":
test "snapshot includes both per-event listeners and wildcards":
test "snapshot returns only the listeners for the requested event":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
@ -126,17 +126,17 @@ suite "FFIEventRegistry snapshot semantics":
var c = Tag(name: "c", rec: addr rec)
discard addEventListener(reg, "evt", tagCb, addr a)
discard addEventListener(reg, "other", tagCb, addr b)
discard addEventListener(reg, "", tagCb, addr c)
discard addEventListener(reg, "evt", tagCb, addr b)
discard addEventListener(reg, "other", tagCb, addr c)
let snapEvt = snapshotListeners(reg, "evt")
check snapEvt.len == 2 # listener for "evt" + wildcard
check snapEvt.len == 2 # both listeners for "evt"
let snapOther = snapshotListeners(reg, "other")
check snapOther.len == 2 # listener for "other" + wildcard
check snapOther.len == 1 # only the listener for "other"
let snapUnknown = snapshotListeners(reg, "no-subscriber")
check snapUnknown.len == 1 # only the wildcard
check snapUnknown.len == 0 # no listener for this event
test "snapshot is a copy: post-snapshot mutation does not affect it":
var reg: FFIEventRegistry
@ -161,7 +161,7 @@ suite "FFIEventRegistry snapshot semantics":
check snap[0].id == id1
suite "removeAllEventListeners":
test "drops every listener (per-event and wildcard)":
test "drops every registered listener":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
@ -174,8 +174,8 @@ suite "removeAllEventListeners":
var b = Tag(name: "b", rec: addr rec)
discard addEventListener(reg, "evt", tagCb, addr a)
discard addEventListener(reg, WildcardEventName, tagCb, addr b)
discard addEventListener(reg, "other", tagCb, addr b)
removeAllEventListeners(reg)
check snapshotListeners(reg, "evt").len == 0
check snapshotListeners(reg, WildcardEventName).len == 0
check snapshotListeners(reg, "other").len == 0