unify create start stop and destroy

This commit is contained in:
Ivan FB 2026-06-24 12:40:47 +02:00
parent b279533214
commit 791e83aa93
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
14 changed files with 89 additions and 214 deletions

View File

@ -348,7 +348,7 @@ int main(int argc, char **argv)
cfgNode.storeRetentionPolicy,
cfgNode.storeMaxNumDbConnections);
ctx = waku_new(jsonConfig, event_handler, userData);
ctx = logosdelivery_create_node(jsonConfig, event_handler, userData);
waitForCallback();
WAKU_CALL(waku_default_pubsub_topic(ctx, print_default_pubsub_topic, userData));
@ -359,7 +359,7 @@ int main(int argc, char **argv)
logosdelivery_set_event_callback(ctx, on_event_received, userData);
waku_start(ctx, event_handler, userData);
logosdelivery_start_node(ctx, event_handler, userData);
waitForCallback();
WAKU_CALL(waku_listen_addresses(ctx, event_handler, userData));

View File

@ -278,9 +278,9 @@ int main(int argc, char **argv)
cfgNode.port);
void *ctx =
waku_new(jsonConfig,
logosdelivery_create_node(jsonConfig,
cify([](const char *msg, size_t len)
{ std::cout << "waku_new feedback: " << msg << std::endl; }),
{ std::cout << "logosdelivery_create_node feedback: " << msg << std::endl; }),
nullptr);
waitForCallback();
@ -317,7 +317,7 @@ int main(int argc, char **argv)
{ event_handler(msg, len); }),
nullptr);
WAKU_CALL(waku_start(ctx,
WAKU_CALL(logosdelivery_start_node(ctx,
cify([&](const char *msg, size_t len)
{ event_handler(msg, len); }),
nullptr));

View File

@ -71,20 +71,20 @@ package main
static void* cGoWakuNew(const char* configJson, void* resp) {
// We pass NULL because we are not interested in retrieving data from this callback
void* ret = waku_new(configJson, (FFICallBack) callback, resp);
void* ret = logosdelivery_create_node(configJson, (FFICallBack) callback, resp);
return ret;
}
static void cGoWakuStart(void* wakuCtx, void* resp) {
WAKU_CALL(waku_start(wakuCtx, (FFICallBack) callback, resp));
WAKU_CALL(logosdelivery_start_node(wakuCtx, (FFICallBack) callback, resp));
}
static void cGoWakuStop(void* wakuCtx, void* resp) {
WAKU_CALL(waku_stop(wakuCtx, (FFICallBack) callback, resp));
WAKU_CALL(logosdelivery_stop_node(wakuCtx, (FFICallBack) callback, resp));
}
static void cGoWakuDestroy(void* wakuCtx, void* resp) {
WAKU_CALL(waku_destroy(wakuCtx, (FFICallBack) callback, resp));
WAKU_CALL(logosdelivery_destroy(wakuCtx, (FFICallBack) callback, resp));
}
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {

View File

@ -240,11 +240,11 @@ actor WakuActor {
print("[WakuActor] Unsubscribed from filter")
// Stop
_ = await self.callWakuSync { waku_stop(ctxToStop, WakuActor.syncCallback, $0) }
_ = await self.callWakuSync { logosdelivery_stop_node(ctxToStop, WakuActor.syncCallback, $0) }
print("[WakuActor] Node stopped")
// Destroy
_ = await self.callWakuSync { waku_destroy(ctxToStop, WakuActor.syncCallback, $0) }
_ = await self.callWakuSync { logosdelivery_destroy(ctxToStop, WakuActor.syncCallback, $0) }
print("[WakuActor] Node destroyed")
}
}
@ -319,13 +319,13 @@ actor WakuActor {
}
"""
// Create node - waku_new is special, it returns the context directly
// Create node - logosdelivery_create_node is special, it returns the context directly
let createResult = await withCheckedContinuation { (continuation: CheckedContinuation<(ctx: UnsafeMutableRawPointer?, success: Bool, result: String?), Never>) in
let callbackCtx = CallbackContext()
let userDataPtr = Unmanaged.passRetained(callbackCtx).toOpaque()
// Set up a simple callback for waku_new
let newCtx = waku_new(config, { ret, msg, len, userData in
// Set up a simple callback for logosdelivery_create_node
let newCtx = logosdelivery_create_node(config, { ret, msg, len, userData in
guard let userData = userData else { return }
let context = Unmanaged<CallbackContext>.fromOpaque(userData).takeUnretainedValue()
context.success = (ret == RET_OK)
@ -354,7 +354,7 @@ actor WakuActor {
// Start node
let startResult = await callWakuSync { userData in
waku_start(self.ctx, WakuActor.syncCallback, userData)
logosdelivery_start_node(self.ctx, WakuActor.syncCallback, userData)
}
guard startResult.success else {

View File

@ -123,7 +123,7 @@ class WakuModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMo
val configStr = stringifyReadableMap(config)
val response = wakuNew(configStr)
if (response.error) {
promise.reject("waku_new", response.errorMessage)
promise.reject("logosdelivery_create_node", response.errorMessage)
} else {
// With this we just indicate to waku_ffi that we have registered a
// closure, for this wakuPtr. Later once a message is received the
@ -140,7 +140,7 @@ class WakuModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMo
val wakuPtr = BigInteger(ctx).toLong()
val response = wakuStart(wakuPtr)
if (response.error) {
promise.reject("waku_start", response.message)
promise.reject("logosdelivery_start_node", response.message)
} else {
promise.resolve(null)
}
@ -162,7 +162,7 @@ class WakuModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMo
val wakuPtr = BigInteger(ctx).toLong()
val response = wakuStop(wakuPtr)
if (response.error) {
promise.reject("waku_stop", response.message)
promise.reject("logosdelivery_stop_node", response.message)
} else {
promise.resolve(null)
}
@ -173,7 +173,7 @@ class WakuModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMo
val wakuPtr = BigInteger(ctx).toLong()
val response = wakuDestroy(wakuPtr)
if (response.error) {
promise.reject("waku_destroy", response.message)
promise.reject("logosdelivery_destroy", response.message)
} else {
promise.resolve(null)
}

View File

@ -184,7 +184,7 @@ jobject Java_com_mobile_WakuModule_wakuNew(JNIEnv *env, jobject thiz,
jstring configJson) {
const char *config = (*env)->GetStringUTFChars(env, configJson, 0);
cb_result *result = NULL;
void *wakuPtr = waku_new(config, on_response, (void *)&result);
void *wakuPtr = logosdelivery_create_node(config, on_response, (void *)&result);
jobject response = to_jni_ptr(env, result, wakuPtr);
(*env)->ReleaseStringUTFChars(env, configJson, config);
free_cb_result(result);
@ -194,7 +194,7 @@ jobject Java_com_mobile_WakuModule_wakuNew(JNIEnv *env, jobject thiz,
jobject Java_com_mobile_WakuModule_wakuStart(JNIEnv *env, jobject thiz,
jlong wakuPtr) {
cb_result *result = NULL;
waku_start((void *)wakuPtr, on_response, &result);
logosdelivery_start_node((void *)wakuPtr, on_response, &result);
jobject response = to_jni_result(env, result);
free_cb_result(result);
return response;
@ -212,7 +212,7 @@ jobject Java_com_mobile_WakuModule_wakuVersion(JNIEnv *env, jobject thiz,
jobject Java_com_mobile_WakuModule_wakuStop(JNIEnv *env, jobject thiz,
jlong wakuPtr) {
cb_result *result = NULL;
waku_stop((void *)wakuPtr, on_response, &result);
logosdelivery_stop_node((void *)wakuPtr, on_response, &result);
jobject response = to_jni_result(env, result);
free_cb_result(result);
return response;
@ -221,7 +221,7 @@ jobject Java_com_mobile_WakuModule_wakuStop(JNIEnv *env, jobject thiz,
jobject Java_com_mobile_WakuModule_wakuDestroy(JNIEnv *env, jobject thiz,
jlong wakuPtr) {
cb_result *result = NULL;
waku_destroy((void *)wakuPtr, on_response, &result);
logosdelivery_destroy((void *)wakuPtr, on_response, &result);
jobject response = to_jni_result(env, result);
free_cb_result(result);
return response;

View File

@ -200,7 +200,7 @@ static napi_value WakuNew(napi_env env, napi_callback_info info) {
str_size = str_size + 1;
napi_get_value_string_utf8(env, args[0], jsonConfig, str_size, &str_size_read);
ctx = waku_new(jsonConfig, event_handler, userData);
ctx = logosdelivery_create_node(jsonConfig, event_handler, userData);
free(jsonConfig);
@ -290,7 +290,7 @@ static napi_value WakuSetEventCallback(napi_env env, napi_callback_info info) {
}
static napi_value WakuStart(napi_env env, napi_callback_info info) {
waku_start(ctx, event_handler, userData);
logosdelivery_start_node(ctx, event_handler, userData);
return NULL;
}

View File

@ -53,7 +53,7 @@ parser.add_argument('--peer', dest='peer', default="",
args = parser.parse_args()
# The next 'json_config' is the item passed to the 'waku_new'.
# The next 'json_config' is the item passed to the 'logosdelivery_create_node'.
json_config = "{ \
\"host\": \"%s\", \
\"port\": %d, \
@ -68,16 +68,16 @@ json_config = "{ \
callback_type = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_size_t)
# Node creation
libwaku.waku_new.restype = ctypes.c_void_p
libwaku.waku_new.argtypes = [ctypes.c_char_p,
libwaku.logosdelivery_create_node.restype = ctypes.c_void_p
libwaku.logosdelivery_create_node.argtypes = [ctypes.c_char_p,
callback_type,
ctypes.c_void_p]
ctx = libwaku.waku_new(bytes(json_config, 'utf-8'),
ctx = libwaku.logosdelivery_create_node(bytes(json_config, 'utf-8'),
callback_type(
#onErrCb
lambda ret, msg, len:
print("Error calling waku_new: %s",
print("Error calling logosdelivery_create_node: %s",
msg.decode('utf-8'))
),
ctypes.c_void_p(0))
@ -115,12 +115,12 @@ libwaku.logosdelivery_set_event_callback.argtypes = [callback_type, ctypes.c_voi
libwaku.logosdelivery_set_event_callback(callback, ctypes.c_void_p(0))
# Start the node
libwaku.waku_start.argtypes = [ctypes.c_void_p,
libwaku.logosdelivery_start_node.argtypes = [ctypes.c_void_p,
callback_type,
ctypes.c_void_p]
libwaku.waku_start(ctx,
libwaku.logosdelivery_start_node(ctx,
callback_type(lambda ret, msg, len:
print("Error in waku_start: %s" %
print("Error in logosdelivery_start_node: %s" %
msg.decode('utf-8'))),
ctypes.c_void_p(0))

View File

@ -25,7 +25,7 @@ public:
WakuHandler() : QObject(), ctx(nullptr) {}
void initialize(const QString& jsonConfig, WakuCallBack event_handler, void* userData) {
ctx = waku_new(jsonConfig.toUtf8().constData(), WakuCallBack(event_handler), userData);
ctx = logosdelivery_create_node(jsonConfig.toUtf8().constData(), WakuCallBack(event_handler), userData);
logosdelivery_set_event_callback(ctx, on_event_received, userData);
qDebug() << "Waku context initialized, ready to start.";
@ -33,7 +33,7 @@ public:
Q_INVOKABLE void start() {
if (ctx) {
waku_start(ctx, event_handler, nullptr);
logosdelivery_start_node(ctx, event_handler, nullptr);
qDebug() << "Waku start called with event_handler and userData.";
} else {
qDebug() << "Context is not initialized in start.";
@ -42,7 +42,7 @@ public:
Q_INVOKABLE void stop() {
if (ctx) {
waku_stop(ctx, event_handler, nullptr);
logosdelivery_stop_node(ctx, event_handler, nullptr);
qDebug() << "Waku stop called with event_handler and userData.";
} else {
qDebug() << "Context is not initialized in stop.";

View File

@ -6,7 +6,7 @@ use std::{slice, thread, time};
pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void);
extern "C" {
pub fn waku_new(
pub fn logosdelivery_create_node(
config_json: *const u8,
cb: FFICallBack,
user_data: *const c_void,
@ -14,7 +14,7 @@ extern "C" {
pub fn waku_version(ctx: *const c_void, cb: FFICallBack, user_data: *const c_void) -> c_int;
pub fn waku_start(ctx: *const c_void, cb: FFICallBack, user_data: *const c_void) -> c_int;
pub fn logosdelivery_start_node(ctx: *const c_void, cb: FFICallBack, user_data: *const c_void) -> c_int;
pub fn waku_default_pubsub_topic(
ctx: *mut c_void,
@ -60,11 +60,11 @@ fn main() {
unsafe {
// Create the waku node
let closure = |ret: i32, data: &str| {
println!("Ret {ret}. waku_new closure called {data}");
println!("Ret {ret}. logosdelivery_create_node closure called {data}");
};
let cb = get_trampoline(&closure);
let config_json_str = CString::new(config_json).unwrap();
let ctx = waku_new(
let ctx = logosdelivery_create_node(
config_json_str.as_ptr() as *const u8,
cb,
&closure as *const _ as *const c_void,
@ -99,10 +99,10 @@ fn main() {
// Start the Waku node
let closure = |ret: i32, data: &str| {
println!("Ret {ret}. waku_start closure called {data}");
println!("Ret {ret}. logosdelivery_start_node closure called {data}");
};
let cb = get_trampoline(&closure);
let _ret = waku_start(ctx, cb, &closure as *const _ as *const c_void);
let _ret = logosdelivery_start_node(ctx, cb, &closure as *const _ as *const c_void);
}
loop {

View File

@ -1,82 +0,0 @@
import logos_delivery/waku/compat/option_valueor
import std/[options, json, strutils, net]
import chronos, chronicles, results, confutils, confutils/std/net, ffi
import
logos_delivery/waku/node/peer_manager/peer_manager,
tools/confutils/cli_args,
logos_delivery/waku/factory/waku,
logos_delivery/waku/factory/node_factory,
logos_delivery/waku/factory/app_callbacks,
logos_delivery/waku/rest_api/endpoint/builder,
library/declare_lib
proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil
): Future[Result[LogosDelivery, string]] {.async.} =
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating node: " & error)
var errorResp: string
var jsonNode: JsonNode
try:
jsonNode = parseJson($configJson)
except Exception:
return err(
"exception in createWaku when calling parseJson: " & getCurrentExceptionMsg() &
" configJson string: " & $configJson
)
for confField, confValue in fieldPairs(conf):
if jsonNode.contains(confField):
# Make sure string doesn't contain the leading or trailing " character
let formattedString = ($jsonNode[confField]).strip(chars = {'\"'})
# Override conf field with the value set in the json-string
try:
confValue = parseCmdArg(typeof(confValue), formattedString)
except Exception:
return err(
"exception in createWaku when parsing configuration. exc: " &
getCurrentExceptionMsg() & ". string that could not be parsed: " &
formattedString & ". expected type: " & $typeof(confValue)
)
# Don't send relay app callbacks if relay is disabled
if not conf.relay and not appCallbacks.isNil():
appCallbacks.relayHandler = nil
appCallbacks.topicHealthChangeHandler = nil
conf.rest = false ## libwaku never runs the REST server
let logosRes = (await LogosDelivery.new(conf, appCallbacks)).valueOr:
error "LogosDelivery initialization failed", error = error
return err("Failed setting up LogosDelivery: " & $error)
return ok(logosRes)
registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(
configJson: cstring, appCallbacks: AppCallbacks
): Future[Result[string, string]] {.async.} =
ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr:
error "CreateNodeWithCallbacksRequest failed", error = error
return err($error)
return ok("")
proc waku_start(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].start()).isOkOr:
error "START_NODE failed", error = error
return err("failed to start: " & $error)
return ok("")
proc waku_stop(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].stop()).isOkOr:
error "STOP_NODE failed", error = error
return err("failed to stop: " & $error)
return ok("")

View File

@ -8,7 +8,10 @@ import
logos_delivery,
logos_delivery/waku/factory/waku,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/node/peer_manager/peer_manager,
logos_delivery/waku/node/health_monitor/health_status,
logos_delivery/waku/node/health_monitor/topic_health,
logos_delivery/waku/node/health_monitor/connection_status,
../logos_delivery/waku/factory/app_callbacks,
./events/json_message_event,
./events/json_topic_health_change_event,
@ -17,43 +20,12 @@ import
./declare_lib
################################################################################
## Include different APIs, i.e. all procs with {.ffi.} pragma
include
./logos_delivery_api/node_api,
./logos_delivery_api/messaging_api,
./logos_delivery_api/debug_api,
./kernel_api/peer_manager_api,
./kernel_api/discovery_api,
./kernel_api/node_lifecycle_api,
./kernel_api/debug_node_api,
./kernel_api/ping_api,
./kernel_api/protocols/relay_api,
./kernel_api/protocols/store_api,
./kernel_api/protocols/lightpush_api,
./kernel_api/protocols/filter_api
################################################################################
### Exported procs (former libwaku API)
proc waku_new(
configJson: cstring, callback: FFICallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary()
## Creates a new instance of the WakuNode.
if isNil(callback):
echo "error: missing callback in waku_new"
return nil
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = ffi.createFFIContext[LogosDelivery]().valueOr:
let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
## Shared FFI event wiring
proc buildAppCallbacks(ctx: ptr FFIContext[LogosDelivery]): AppCallbacks =
## Builds the libp2p-level callbacks that bridge node events onto the FFI
## event callback. Shared by the single create_node entry point so both the
## stable (messaging) and kernel (waku_*) header surfaces get the same wiring.
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
@ -74,40 +46,25 @@ proc waku_new(
callEventCallback(ctx, "onConnectionStatusChange"):
$JsonConnectionStatusChangeEvent.new(status)
let appCallbacks = AppCallbacks(
return AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx),
connectionStatusChangeHandler: onConnectionStatusChange(ctx),
)
ffi.sendRequestToFFIThread(
ctx,
CreateNodeWithCallbacksRequest.ffiNewReq(
callback, userData, configJson, appCallbacks
),
).isOkOr:
let msg = "error in sendRequestToFFIThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
################################################################################
## Include different APIs, i.e. all procs with {.ffi.} pragma
return ctx
proc waku_destroy(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
ffi.destroyFFIContext(ctx).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
return RET_OK
# ### End of exported procs
# ################################################################################
include
./logos_delivery_api/node_api,
./logos_delivery_api/messaging_api,
./logos_delivery_api/debug_api,
./kernel_api/peer_manager_api,
./kernel_api/discovery_api,
./kernel_api/debug_node_api,
./kernel_api/ping_api,
./kernel_api/protocols/relay_api,
./kernel_api/protocols/store_api,
./kernel_api/protocols/lightpush_api,
./kernel_api/protocols/filter_api

View File

@ -26,26 +26,11 @@ extern "C"
{
#endif
// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
// Returns a pointer to the Context needed by the rest of the API functions.
void *waku_new(
const char *configJson,
FFICallBack callback,
void *userData);
int waku_start(void *ctx,
FFICallBack callback,
void *userData);
int waku_stop(void *ctx,
FFICallBack callback,
void *userData);
// Destroys an instance of a waku node created with waku_new
int waku_destroy(void *ctx,
FFICallBack callback,
void *userData);
// NOTE: node lifecycle (create / start / stop / destroy) is unified and lives
// only in the stable header. Use logosdelivery_create_node,
// logosdelivery_start_node, logosdelivery_stop_node and logosdelivery_destroy
// (declared in liblogosdelivery.h, included above) regardless of whether you
// drive the node through the messaging surface or this kernel API.
int waku_version(void *ctx,
FFICallBack callback,

View File

@ -5,6 +5,7 @@ import
logos_delivery/waku/node/waku_node,
logos_delivery/waku/api/[api, types],
logos_delivery/waku/events/[message_events, health_events],
logos_delivery/waku/factory/app_callbacks,
tools/confutils/conf_from_json,
../declare_lib,
../json_event
@ -14,13 +15,25 @@ proc `%`*(id: RequestId): JsonNode =
%($id)
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(configJson: cstring): Future[Result[string, string]] {.async.} =
let conf = parseNodeConfFromJson($configJson).valueOr:
proc(
configJson: cstring, appCallbacks: AppCallbacks
): Future[Result[string, string]] {.async.} =
# liblogosdelivery JSON semantics: case-insensitive keys, reject unknown ones.
var conf = parseNodeConfFromJson($configJson).valueOr:
error "Failed to assemble WakuNodeConf from JSON",
error = error, configJson = $configJson
return err("failed parseNodeConfFromJson " & error)
ctx.myLib[] = (await LogosDelivery.new(conf)).valueOr:
# The REST server is a CLI-only surface; the FFI library never runs it.
conf.rest = false
# Don't forward relay callbacks if relay is disabled.
let callbacks = cast[AppCallbacks](appCallbacks)
if not conf.relay and not callbacks.isNil():
callbacks.relayHandler = nil
callbacks.topicHealthChangeHandler = nil
ctx.myLib[] = (await LogosDelivery.new(conf, callbacks)).valueOr:
let errMsg = $error
chronicles.error "CreateNodeRequest failed", err = errMsg
return err(errMsg)
@ -59,8 +72,10 @@ proc logosdelivery_create_node(
ctx.userData = userData
let appCallbacks = buildAppCallbacks(ctx)
ffi.sendRequestToFFIThread(
ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson)
ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson, appCallbacks)
).isOkOr:
let msg = "error in sendRequestToFFIThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)