mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-04-10 13:53:11 +00:00
* Add pre-check of options used in config Json for logos-delivery-api pre-createNode, treat unrecognized options as error * Collect all unrecognized options and report them at once. * Refactor json config parsing and error detection
198 lines
7.1 KiB
Nim
198 lines
7.1 KiB
Nim
import std/[json, strutils, tables]
|
|
import chronos, chronicles, results, confutils, confutils/std/net, ffi
|
|
import
|
|
waku/factory/waku,
|
|
waku/node/waku_node,
|
|
waku/api/[api, types],
|
|
waku/events/[message_events, health_events],
|
|
tools/confutils/cli_args,
|
|
../declare_lib,
|
|
../json_event
|
|
|
|
# Add JSON serialization for RequestId
|
|
proc `%`*(id: RequestId): JsonNode =
|
|
%($id)
|
|
|
|
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
|
|
proc(configJson: cstring): Future[Result[string, string]] {.async.} =
|
|
## Parse the JSON configuration using fieldPairs approach (WakuNodeConf)
|
|
var conf = defaultWakuNodeConf().valueOr:
|
|
return err("Failed creating default conf: " & error)
|
|
|
|
var jsonNode: JsonNode
|
|
try:
|
|
jsonNode = parseJson($configJson)
|
|
except Exception:
|
|
let exceptionMsg = getCurrentExceptionMsg()
|
|
error "Failed to parse config JSON",
|
|
error = exceptionMsg, configJson = $configJson
|
|
return err(
|
|
"Failed to parse config JSON: " & exceptionMsg & " configJson string: " &
|
|
$configJson
|
|
)
|
|
|
|
var jsonFields: Table[string, (string, JsonNode)]
|
|
for key, value in jsonNode:
|
|
let lowerKey = key.toLowerAscii()
|
|
|
|
if jsonFields.hasKey(lowerKey):
|
|
error "Duplicate configuration option found when normalized to lowercase",
|
|
key = key
|
|
return err(
|
|
"Duplicate configuration option found when normalized to lowercase: '" & key &
|
|
"'"
|
|
)
|
|
|
|
jsonFields[lowerKey] = (key, value)
|
|
|
|
for confField, confValue in fieldPairs(conf):
|
|
let lowerField = confField.toLowerAscii()
|
|
if jsonFields.hasKey(lowerField):
|
|
let (jsonKey, jsonValue) = jsonFields[lowerField]
|
|
let formattedString = ($jsonValue).strip(chars = {'\"'})
|
|
try:
|
|
confValue = parseCmdArg(typeof(confValue), formattedString)
|
|
except Exception:
|
|
return err(
|
|
"Failed to parse field '" & confField & "' from JSON key '" & jsonKey & "': " &
|
|
getCurrentExceptionMsg() & ". Value: " & formattedString
|
|
)
|
|
|
|
jsonFields.del(lowerField)
|
|
|
|
if jsonFields.len > 0:
|
|
var unknownKeys = newSeq[string]()
|
|
for _, (jsonKey, _) in pairs(jsonFields):
|
|
unknownKeys.add(jsonKey)
|
|
error "Unrecognized configuration option(s) found", option = unknownKeys
|
|
return err("Unrecognized configuration option(s) found: " & $unknownKeys)
|
|
|
|
# Create the node
|
|
ctx.myLib[] = (await api.createNode(conf)).valueOr:
|
|
let errMsg = $error
|
|
chronicles.error "CreateNodeRequest failed", err = errMsg
|
|
return err(errMsg)
|
|
|
|
return ok("")
|
|
|
|
proc logosdelivery_destroy(
|
|
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
|
): cint {.dynlib, exportc, cdecl.} =
|
|
initializeLibrary()
|
|
checkParams(ctx, callback, userData)
|
|
|
|
ffi.destroyFFIContext(ctx).isOkOr:
|
|
let msg = "liblogosdelivery error: " & $error
|
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
|
return RET_ERR
|
|
|
|
## always need to invoke the callback although we don't retrieve value to the caller
|
|
callback(RET_OK, nil, 0, userData)
|
|
|
|
return RET_OK
|
|
|
|
proc logosdelivery_create_node(
|
|
configJson: cstring, callback: FFICallback, userData: pointer
|
|
): pointer {.dynlib, exportc, cdecl.} =
|
|
initializeLibrary()
|
|
|
|
if isNil(callback):
|
|
echo "error: missing callback in logosdelivery_create_node"
|
|
return nil
|
|
|
|
var ctx = ffi.createFFIContext[Waku]().valueOr:
|
|
let msg = "Error in createFFIContext: " & $error
|
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
|
return nil
|
|
|
|
ctx.userData = userData
|
|
|
|
ffi.sendRequestToFFIThread(
|
|
ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson)
|
|
).isOkOr:
|
|
let msg = "error in sendRequestToFFIThread: " & $error
|
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
|
# free allocated resources as they won't be available
|
|
ffi.destroyFFIContext(ctx).isOkOr:
|
|
chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation",
|
|
err = $error
|
|
return nil
|
|
|
|
return ctx
|
|
|
|
proc logosdelivery_start_node(
|
|
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
|
) {.ffi.} =
|
|
requireInitializedNode(ctx, "START_NODE"):
|
|
return err(errMsg)
|
|
|
|
# setting up outgoing event listeners
|
|
let sentListener = MessageSentEvent.listen(
|
|
ctx.myLib[].brokerCtx,
|
|
proc(event: MessageSentEvent) {.async: (raises: []).} =
|
|
callEventCallback(ctx, "onMessageSent"):
|
|
$newJsonEvent("message_sent", event),
|
|
).valueOr:
|
|
chronicles.error "MessageSentEvent.listen failed", err = $error
|
|
return err("MessageSentEvent.listen failed: " & $error)
|
|
|
|
let errorListener = MessageErrorEvent.listen(
|
|
ctx.myLib[].brokerCtx,
|
|
proc(event: MessageErrorEvent) {.async: (raises: []).} =
|
|
callEventCallback(ctx, "onMessageError"):
|
|
$newJsonEvent("message_error", event),
|
|
).valueOr:
|
|
chronicles.error "MessageErrorEvent.listen failed", err = $error
|
|
return err("MessageErrorEvent.listen failed: " & $error)
|
|
|
|
let propagatedListener = MessagePropagatedEvent.listen(
|
|
ctx.myLib[].brokerCtx,
|
|
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
|
|
callEventCallback(ctx, "onMessagePropagated"):
|
|
$newJsonEvent("message_propagated", event),
|
|
).valueOr:
|
|
chronicles.error "MessagePropagatedEvent.listen failed", err = $error
|
|
return err("MessagePropagatedEvent.listen failed: " & $error)
|
|
|
|
let receivedListener = MessageReceivedEvent.listen(
|
|
ctx.myLib[].brokerCtx,
|
|
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
|
|
callEventCallback(ctx, "onMessageReceived"):
|
|
$newJsonEvent("message_received", event),
|
|
).valueOr:
|
|
chronicles.error "MessageReceivedEvent.listen failed", err = $error
|
|
return err("MessageReceivedEvent.listen failed: " & $error)
|
|
|
|
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
|
|
ctx.myLib[].brokerCtx,
|
|
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
|
|
callEventCallback(ctx, "onConnectionStatusChange"):
|
|
$newJsonEvent("connection_status_change", event),
|
|
).valueOr:
|
|
chronicles.error "ConnectionStatusChange.listen failed", err = $error
|
|
return err("ConnectionStatusChange.listen failed: " & $error)
|
|
|
|
(await startWaku(addr ctx.myLib[])).isOkOr:
|
|
let errMsg = $error
|
|
chronicles.error "START_NODE failed", err = errMsg
|
|
return err("failed to start: " & errMsg)
|
|
return ok("")
|
|
|
|
proc logosdelivery_stop_node(
|
|
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
|
) {.ffi.} =
|
|
requireInitializedNode(ctx, "STOP_NODE"):
|
|
return err(errMsg)
|
|
|
|
MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
|
|
MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
|
|
MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
|
|
MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
|
|
EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)
|
|
|
|
(await ctx.myLib[].stop()).isOkOr:
|
|
let errMsg = $error
|
|
chronicles.error "STOP_NODE failed", err = errMsg
|
|
return err("failed to stop: " & errMsg)
|
|
return ok("")
|