diff --git a/Makefile b/Makefile index 4fafd6310..8f98e90bd 100644 --- a/Makefile +++ b/Makefile @@ -469,6 +469,7 @@ logosdelivery_example: | build liblogosdelivery ifeq ($(detected_OS),Darwin) gcc -o build/$@ \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ @@ -476,6 +477,7 @@ ifeq ($(detected_OS),Darwin) else ifeq ($(detected_OS),Linux) gcc -o build/$@ \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ @@ -483,6 +485,7 @@ else ifeq ($(detected_OS),Linux) else ifeq ($(detected_OS),Windows) gcc -o build/$@.exe \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ diff --git a/liblogosdelivery/examples/json_utils.c b/liblogosdelivery/examples/json_utils.c new file mode 100644 index 000000000..8b33bb648 --- /dev/null +++ b/liblogosdelivery/examples/json_utils.c @@ -0,0 +1,96 @@ +#include "json_utils.h" +#include +#include + +const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return NULL; + } + + start += strlen(searchStr); + const char *end = strchr(start, '"'); + if (!end) { + return NULL; + } + + size_t len = end - start; + if (len >= bufSize) { + len = bufSize - 1; + } + + memcpy(buffer, start, len); + buffer[len] = '\0'; + + return buffer; +} + +const char* extract_json_object(const char *json, const char *field, size_t *outLen) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":{", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return NULL; + } + + // Advance to the opening brace + start = strchr(start, '{'); + if (!start) { + return NULL; + } + + // Find the matching closing brace (handles nested braces) + int depth = 0; + const char *p = start; + while (*p) { + if (*p == '{') depth++; + else if (*p == '}') { + depth--; + if (depth == 0) { + *outLen = (size_t)(p - start + 1); + return start; + } + } + p++; + } + return NULL; +} + +int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":[", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return -1; + } + + // Advance to the opening bracket + start = strchr(start, '['); + if (!start) { + return -1; + } + start++; // skip '[' + + size_t pos = 0; + const char *p = start; + while (*p && *p != ']' && pos < bufSize - 1) { + // Skip whitespace and commas + while (*p == ' ' || *p == ',' || *p == '\n' || *p == '\r' || *p == '\t') p++; + if (*p == ']') break; + + // Parse integer + int val = 0; + while (*p >= '0' && *p <= '9') { + val = val * 10 + (*p - '0'); + p++; + } + buffer[pos++] = (char)val; + } + buffer[pos] = '\0'; + return (int)pos; +} diff --git a/liblogosdelivery/examples/json_utils.h b/liblogosdelivery/examples/json_utils.h new file mode 100644 index 000000000..4039ca4f6 --- /dev/null +++ b/liblogosdelivery/examples/json_utils.h @@ -0,0 +1,21 @@ +#ifndef JSON_UTILS_H +#define JSON_UTILS_H + +#include + +// Extract a JSON string field value into buffer. +// Returns pointer to buffer on success, NULL on failure. +// Very basic parser - for production use a proper JSON library. +const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize); + +// Extract a nested JSON object as a raw string. +// Returns a pointer into `json` at the start of the object, and sets `outLen`. +// Handles nested braces. +const char* extract_json_object(const char *json, const char *field, size_t *outLen); + +// Decode a JSON array of integers (byte values) into a buffer. +// Parses e.g. [72,101,108,108,111] into "Hello". +// Returns number of bytes decoded, or -1 on error. +int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize); + +#endif // JSON_UTILS_H diff --git a/liblogosdelivery/examples/logosdelivery_example.c b/liblogosdelivery/examples/logosdelivery_example.c index 61333f84d..729f7f0dc 100644 --- a/liblogosdelivery/examples/logosdelivery_example.c +++ b/liblogosdelivery/examples/logosdelivery_example.c @@ -1,4 +1,5 @@ #include "../liblogosdelivery.h" +#include "json_utils.h" #include #include #include @@ -6,33 +7,10 @@ static int create_node_ok = -1; -// Helper function to extract a JSON string field value -// Very basic parser - for production use a proper JSON library -const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) { - char searchStr[256]; - snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field); - - const char *start = strstr(json, searchStr); - if (!start) { - return NULL; - } - - start += strlen(searchStr); - const char *end = strchr(start, '"'); - if (!end) { - return NULL; - } - - size_t len = end - start; - if (len >= bufSize) { - len = bufSize - 1; - } - - memcpy(buffer, start, len); - buffer[len] = '\0'; - - return buffer; -} +// Flags set by event callback, polled by main thread +static volatile int got_message_sent = 0; +static volatile int got_message_error = 0; +static volatile int got_message_received = 0; // Event callback that handles message events void event_callback(int ret, const char *msg, size_t len, void *userData) { @@ -62,6 +40,7 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) { extract_json_field(eventJson, "requestId", requestId, sizeof(requestId)); extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); printf("[EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash); + got_message_sent = 1; } else if (strcmp(eventType, "message_error") == 0) { char requestId[128]; @@ -72,6 +51,7 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) { extract_json_field(eventJson, "error", error, sizeof(error)); printf("[EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n", requestId, messageHash, error); + got_message_error = 1; } else if (strcmp(eventType, "message_propagated") == 0) { char requestId[128]; @@ -85,6 +65,41 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) { extract_json_field(eventJson, "connectionStatus", connectionStatus, sizeof(connectionStatus)); printf("[EVENT] Connection status change - Status: %s\n", connectionStatus); + } else if (strcmp(eventType, "message_received") == 0) { + char messageHash[128]; + extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); + + // Extract the nested "message" object + size_t msgObjLen = 0; + const char *msgObj = extract_json_object(eventJson, "message", &msgObjLen); + if (msgObj) { + // Make a null-terminated copy of the message object + char *msgJson = malloc(msgObjLen + 1); + if (msgJson) { + memcpy(msgJson, msgObj, msgObjLen); + msgJson[msgObjLen] = '\0'; + + char contentTopic[256]; + extract_json_field(msgJson, "contentTopic", contentTopic, sizeof(contentTopic)); + + // Decode payload from JSON byte array to string + char payload[4096]; + int payloadLen = decode_json_byte_array(msgJson, "payload", payload, sizeof(payload)); + + printf("[EVENT] Message received - Hash: %s, ContentTopic: %s\n", messageHash, contentTopic); + if (payloadLen > 0) { + printf(" Payload (%d bytes): %.*s\n", payloadLen, payloadLen, payload); + } else { + printf(" Payload: (empty or could not decode)\n"); + } + + free(msgJson); + } + } else { + printf("[EVENT] Message received - Hash: %s (could not parse message)\n", messageHash); + } + got_message_received = 1; + } else { printf("[EVENT] Unknown event type: %s\n", eventType); } @@ -146,7 +161,7 @@ int main() { logosdelivery_start_node(ctx, simple_callback, (void *)"start_node"); // Wait for node to start - sleep(10); + sleep(5); printf("\n4. Subscribing to content topic...\n"); const char *contentTopic = "/example/1/chat/proto"; @@ -181,9 +196,18 @@ int main() { "}"; logosdelivery_send(ctx, simple_callback, (void *)"send", message); - // Wait for message events to arrive + // Poll for terminal message events (sent, error, or received) with timeout printf("Waiting for message delivery events...\n"); - sleep(60); + int timeout_sec = 60; + int elapsed = 0; + while (!(got_message_sent || got_message_error || got_message_received) + && elapsed < timeout_sec) { + usleep(100000); // 100ms + elapsed++; + } + if (elapsed >= timeout_sec) { + printf("Timed out waiting for message events after %d seconds\n", timeout_sec); + } printf("\n7. Unsubscribing from content topic...\n"); logosdelivery_unsubscribe(ctx, simple_callback, (void *)"unsubscribe", contentTopic); diff --git a/liblogosdelivery/logos_delivery_api/node_api.nim b/liblogosdelivery/logos_delivery_api/node_api.nim index 1835f75b5..cd644abd7 100644 --- a/liblogosdelivery/logos_delivery_api/node_api.nim +++ b/liblogosdelivery/logos_delivery_api/node_api.nim @@ -35,8 +35,8 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): confValue = parseCmdArg(typeof(confValue), formattedString) except Exception: return err( - "Failed to parse field '" & confField & "': " & - getCurrentExceptionMsg() & ". Value: " & formattedString + "Failed to parse field '" & confField & "': " & getCurrentExceptionMsg() & + ". Value: " & formattedString ) # Create the node @@ -86,7 +86,8 @@ proc logosdelivery_create_node( callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) # free allocated resources as they won't be available ffi.destroyFFIContext(ctx).isOkOr: - chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation", err = $error + chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation", + err = $error return nil return ctx @@ -125,6 +126,15 @@ proc logosdelivery_start_node( chronicles.error "MessagePropagatedEvent.listen failed", err = $error return err("MessagePropagatedEvent.listen failed: " & $error) + let receivedListener = MessageReceivedEvent.listen( + ctx.myLib[].brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + callEventCallback(ctx, "onMessageReceived"): + $newJsonEvent("message_received", event), + ).valueOr: + chronicles.error "MessageReceivedEvent.listen failed", err = $error + return err("MessageReceivedEvent.listen failed: " & $error) + let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( ctx.myLib[].brokerCtx, proc(event: EventConnectionStatusChange) {.async: (raises: []).} = @@ -149,6 +159,7 @@ proc logosdelivery_stop_node( MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx) MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx) MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx) + MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx) EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx) (await ctx.myLib[].stop()).isOkOr: