mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-24 21:08:38 +00:00
libwaku better params validation and a bit more clarity (#3005)
This commit is contained in:
parent
e8a49b76b2
commit
19feb6bd58
2
Makefile
2
Makefile
@ -427,6 +427,7 @@ cwaku_example: | build libwaku
|
||||
./examples/cbindings/base64.c \
|
||||
-lwaku -Lbuild/ \
|
||||
-pthread -ldl -lm \
|
||||
-lnegentropy -Lvendor/negentropy/cpp/ \
|
||||
-lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \
|
||||
-lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \
|
||||
vendor/nim-libbacktrace/libbacktrace_wrapper.o \
|
||||
@ -439,6 +440,7 @@ cppwaku_example: | build libwaku
|
||||
./examples/cpp/base64.cpp \
|
||||
-lwaku -Lbuild/ \
|
||||
-pthread -ldl -lm \
|
||||
-lnegentropy -Lvendor/negentropy/cpp/ \
|
||||
-lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \
|
||||
-lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \
|
||||
vendor/nim-libbacktrace/libbacktrace_wrapper.o \
|
||||
|
@ -1,3 +1,11 @@
|
||||
import ./waku_thread/waku_thread
|
||||
|
||||
type WakuCallBack* = proc(
|
||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].}
|
||||
|
||||
template checkLibwakuParams*(ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer) =
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
|
@ -5,7 +5,7 @@
|
||||
when defined(linux):
|
||||
{.passl: "-Wl,-soname,libwaku.so".}
|
||||
|
||||
import std/[json, sequtils, atomics, times, strformat, options, atomics, strutils, os]
|
||||
import std/[json, sequtils, atomics, strformat, options, atomics]
|
||||
import chronicles, chronos
|
||||
import
|
||||
waku/common/base64,
|
||||
@ -52,7 +52,7 @@ template foreignThreadGc(body: untyped) =
|
||||
when declared(tearDownForeignThreadGc):
|
||||
tearDownForeignThreadGc()
|
||||
|
||||
proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =
|
||||
proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler =
|
||||
return proc(
|
||||
pubsubTopic: PubsubTopic, msg: WakuMessage
|
||||
): Future[system.void] {.async.} =
|
||||
@ -144,10 +144,9 @@ proc waku_new(
|
||||
return ctx
|
||||
|
||||
proc waku_destroy(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
waku_thread.stopWakuThread(ctx).isOkOr:
|
||||
foreignThreadGc:
|
||||
@ -158,12 +157,9 @@ proc waku_destroy(
|
||||
return RET_OK
|
||||
|
||||
proc waku_version(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
foreignThreadGc:
|
||||
callback(
|
||||
@ -176,13 +172,13 @@ proc waku_version(
|
||||
return RET_OK
|
||||
|
||||
proc waku_set_event_callback(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
) {.dynlib, exportc.} =
|
||||
ctx[].eventCallback = cast[pointer](callback)
|
||||
ctx[].eventUserData = userData
|
||||
|
||||
proc waku_content_topic(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
appName: cstring,
|
||||
appVersion: cuint,
|
||||
contentTopicName: cstring,
|
||||
@ -192,10 +188,7 @@ proc waku_content_topic(
|
||||
): cint {.dynlib, exportc.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
|
||||
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let appStr = appName.alloc()
|
||||
let ctnStr = contentTopicName.alloc()
|
||||
@ -213,14 +206,11 @@ proc waku_content_topic(
|
||||
return RET_OK
|
||||
|
||||
proc waku_pubsub_topic(
|
||||
ctx: ptr Context, topicName: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, topicName: cstring, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc, cdecl.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
|
||||
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let topicNameStr = topicName.alloc()
|
||||
|
||||
@ -234,14 +224,11 @@ proc waku_pubsub_topic(
|
||||
return RET_OK
|
||||
|
||||
proc waku_default_pubsub_topic(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
|
||||
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
callback(
|
||||
RET_OK,
|
||||
@ -253,7 +240,7 @@ proc waku_default_pubsub_topic(
|
||||
return RET_OK
|
||||
|
||||
proc waku_relay_publish(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
jsonWakuMessage: cstring,
|
||||
timeoutMs: cuint,
|
||||
@ -262,10 +249,7 @@ proc waku_relay_publish(
|
||||
): cint {.dynlib, exportc, cdecl.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
|
||||
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let jwm = jsonWakuMessage.alloc()
|
||||
var jsonMessage: JsonMessage
|
||||
@ -315,9 +299,9 @@ proc waku_relay_publish(
|
||||
return RET_OK
|
||||
|
||||
proc waku_start(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
## TODO: handle the error
|
||||
discard waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -326,9 +310,10 @@ proc waku_start(
|
||||
)
|
||||
|
||||
proc waku_stop(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
## TODO: handle the error
|
||||
discard waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -337,9 +322,12 @@ proc waku_stop(
|
||||
)
|
||||
|
||||
proc waku_relay_subscribe(
|
||||
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let pst = pubSubTopic.alloc()
|
||||
var cb = relayEventCallback(ctx)
|
||||
@ -360,9 +348,12 @@ proc waku_relay_subscribe(
|
||||
return RET_OK
|
||||
|
||||
proc waku_relay_unsubscribe(
|
||||
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let pst = pubSubTopic.alloc()
|
||||
|
||||
@ -385,9 +376,12 @@ proc waku_relay_unsubscribe(
|
||||
return RET_OK
|
||||
|
||||
proc waku_relay_get_num_connected_peers(
|
||||
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let pst = pubSubTopic.alloc()
|
||||
defer:
|
||||
@ -414,9 +408,12 @@ proc waku_relay_get_num_connected_peers(
|
||||
return RET_OK
|
||||
|
||||
proc waku_relay_get_num_peers_in_mesh(
|
||||
ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let pst = pubSubTopic.alloc()
|
||||
defer:
|
||||
@ -443,16 +440,13 @@ proc waku_relay_get_num_peers_in_mesh(
|
||||
return RET_OK
|
||||
|
||||
proc waku_lightpush_publish(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
pubSubTopic: cstring,
|
||||
jsonWakuMessage: cstring,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc, cdecl.} =
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let jwm = jsonWakuMessage.alloc()
|
||||
let pst = pubSubTopic.alloc()
|
||||
@ -498,13 +492,13 @@ proc waku_lightpush_publish(
|
||||
return RET_OK
|
||||
|
||||
proc waku_connect(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
peerMultiAddr: cstring,
|
||||
timeoutMs: cuint,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let connRes = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -521,22 +515,19 @@ proc waku_connect(
|
||||
return RET_OK
|
||||
|
||||
proc waku_store_query(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
jsonQuery: cstring,
|
||||
peerAddr: cstring,
|
||||
timeoutMs: cint,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let sendReqRes = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
RequestType.STORE,
|
||||
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs, callback),
|
||||
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs),
|
||||
)
|
||||
|
||||
if sendReqRes.isErr():
|
||||
@ -549,9 +540,9 @@ proc waku_store_query(
|
||||
return RET_OK
|
||||
|
||||
proc waku_listen_addresses(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let connRes = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -568,14 +559,14 @@ proc waku_listen_addresses(
|
||||
return RET_OK
|
||||
|
||||
proc waku_dns_discovery(
|
||||
ctx: ptr Context,
|
||||
ctx: ptr WakuContext,
|
||||
entTreeUrl: cstring,
|
||||
nameDnsServer: cstring,
|
||||
timeoutMs: cint,
|
||||
callback: WakuCallBack,
|
||||
userData: pointer,
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let bootstrapPeers = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -593,11 +584,11 @@ proc waku_dns_discovery(
|
||||
return RET_OK
|
||||
|
||||
proc waku_discv5_update_bootnodes(
|
||||
ctx: ptr Context, bootnodes: cstring, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
## Updates the bootnode list used for discovering new peers via DiscoveryV5
|
||||
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let resp = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -615,9 +606,9 @@ proc waku_discv5_update_bootnodes(
|
||||
return RET_OK
|
||||
|
||||
proc waku_get_my_enr(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let connRes = waku_thread.sendRequestToWakuThread(
|
||||
ctx,
|
||||
@ -634,9 +625,9 @@ proc waku_get_my_enr(
|
||||
return RET_OK
|
||||
|
||||
proc waku_start_discv5(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let resp = waku_thread.sendRequestToWakuThread(
|
||||
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest()
|
||||
@ -651,9 +642,9 @@ proc waku_start_discv5(
|
||||
return RET_OK
|
||||
|
||||
proc waku_stop_discv5(
|
||||
ctx: ptr Context, callback: WakuCallBack, userData: pointer
|
||||
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||
): cint {.dynlib, exportc.} =
|
||||
ctx[].userData = userData
|
||||
checkLibwakuParams(ctx, callback, userData)
|
||||
|
||||
let resp = waku_thread.sendRequestToWakuThread(
|
||||
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest()
|
||||
|
@ -3,7 +3,6 @@ import chronos, results
|
||||
import
|
||||
../../../../../waku/factory/waku,
|
||||
../../../../alloc,
|
||||
../../../../callback,
|
||||
../../../../../waku/waku_core/peers,
|
||||
../../../../../waku/waku_core/time,
|
||||
../../../../../waku/waku_core/message/digest,
|
||||
@ -18,7 +17,6 @@ type JsonStoreQueryRequest* = object
|
||||
jsonQuery: cstring
|
||||
peerAddr: cstring
|
||||
timeoutMs: cint
|
||||
storeCallback: WakuCallBack
|
||||
|
||||
type StoreRequest* = object
|
||||
operation: StoreReqType
|
||||
@ -99,13 +97,11 @@ proc createShared*(
|
||||
jsonQuery: cstring,
|
||||
peerAddr: cstring,
|
||||
timeoutMs: cint,
|
||||
storeCallback: WakuCallBack = nil,
|
||||
): ptr type T =
|
||||
var ret = createShared(T)
|
||||
ret[].timeoutMs = timeoutMs
|
||||
ret[].jsonQuery = jsonQuery.alloc()
|
||||
ret[].peerAddr = peerAddr.alloc()
|
||||
ret[].storeCallback = storeCallback
|
||||
return ret
|
||||
|
||||
proc destroyShared(self: ptr JsonStoreQueryRequest) =
|
||||
|
@ -9,8 +9,8 @@ import
|
||||
./inter_thread_communication/waku_thread_request,
|
||||
./inter_thread_communication/waku_thread_response
|
||||
|
||||
type Context* = object
|
||||
thread: Thread[(ptr Context)]
|
||||
type WakuContext* = object
|
||||
thread: Thread[(ptr WakuContext)]
|
||||
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
|
||||
reqSignal: ThreadSignalPtr
|
||||
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
|
||||
@ -26,7 +26,7 @@ const versionString = "version / git commit hash: " & waku.git_version
|
||||
# TODO: this should be part of the context so multiple instances can be executed
|
||||
var running: Atomic[bool]
|
||||
|
||||
proc runWaku(ctx: ptr Context) {.async.} =
|
||||
proc runWaku(ctx: ptr WakuContext) {.async.} =
|
||||
## This is the worker body. This runs the Waku node
|
||||
## and attends library user requests (stop, connect_to, etc.)
|
||||
info "Starting Waku", version = versionString
|
||||
@ -49,14 +49,14 @@ proc runWaku(ctx: ptr Context) {.async.} =
|
||||
discard ctx.respChannel.trySend(threadSafeResp)
|
||||
discard ctx.respSignal.fireSync()
|
||||
|
||||
proc run(ctx: ptr Context) {.thread.} =
|
||||
proc run(ctx: ptr WakuContext) {.thread.} =
|
||||
## Launch waku worker
|
||||
waitFor runWaku(ctx)
|
||||
|
||||
proc createWakuThread*(): Result[ptr Context, string] =
|
||||
proc createWakuThread*(): Result[ptr WakuContext, string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the Waku working thread.
|
||||
var ctx = createShared(Context, 1)
|
||||
var ctx = createShared(WakuContext, 1)
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||
ctx.respSignal = ThreadSignalPtr.new().valueOr:
|
||||
@ -74,7 +74,7 @@ proc createWakuThread*(): Result[ptr Context, string] =
|
||||
|
||||
return ok(ctx)
|
||||
|
||||
proc stopWakuThread*(ctx: ptr Context): Result[void, string] =
|
||||
proc stopWakuThread*(ctx: ptr WakuContext): Result[void, string] =
|
||||
running.store(false)
|
||||
let fireRes = ctx.reqSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
@ -86,7 +86,7 @@ proc stopWakuThread*(ctx: ptr Context): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc sendRequestToWakuThread*(
|
||||
ctx: ptr Context, reqType: RequestType, reqContent: pointer
|
||||
ctx: ptr WakuContext, reqType: RequestType, reqContent: pointer
|
||||
): Result[string, string] =
|
||||
let req = InterThreadRequest.createShared(reqType, reqContent)
|
||||
## Sending the request
|
||||
|
Loading…
x
Reference in New Issue
Block a user