liblogosdelivery supports MessageReceivedEvent propagation over FFI. Adjusted example. (#3747)

This commit is contained in:
NagyZoltanPeter 2026-03-04 09:48:48 +01:00 committed by GitHub
parent 4a6ad73235
commit 0ad55159b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 188 additions and 33 deletions

View File

@ -469,6 +469,7 @@ logosdelivery_example: | build liblogosdelivery
ifeq ($(detected_OS),Darwin)
gcc -o build/$@ \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \
@ -476,6 +477,7 @@ ifeq ($(detected_OS),Darwin)
else ifeq ($(detected_OS),Linux)
gcc -o build/$@ \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \
@ -483,6 +485,7 @@ else ifeq ($(detected_OS),Linux)
else ifeq ($(detected_OS),Windows)
gcc -o build/$@.exe \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \

View File

@ -0,0 +1,96 @@
#include "json_utils.h"
#include <stdio.h>
#include <string.h>
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
start += strlen(searchStr);
const char *end = strchr(start, '"');
if (!end) {
return NULL;
}
size_t len = end - start;
if (len >= bufSize) {
len = bufSize - 1;
}
memcpy(buffer, start, len);
buffer[len] = '\0';
return buffer;
}
const char* extract_json_object(const char *json, const char *field, size_t *outLen) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":{", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
// Advance to the opening brace
start = strchr(start, '{');
if (!start) {
return NULL;
}
// Find the matching closing brace (handles nested braces)
int depth = 0;
const char *p = start;
while (*p) {
if (*p == '{') depth++;
else if (*p == '}') {
depth--;
if (depth == 0) {
*outLen = (size_t)(p - start + 1);
return start;
}
}
p++;
}
return NULL;
}
int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":[", field);
const char *start = strstr(json, searchStr);
if (!start) {
return -1;
}
// Advance to the opening bracket
start = strchr(start, '[');
if (!start) {
return -1;
}
start++; // skip '['
size_t pos = 0;
const char *p = start;
while (*p && *p != ']' && pos < bufSize - 1) {
// Skip whitespace and commas
while (*p == ' ' || *p == ',' || *p == '\n' || *p == '\r' || *p == '\t') p++;
if (*p == ']') break;
// Parse integer
int val = 0;
while (*p >= '0' && *p <= '9') {
val = val * 10 + (*p - '0');
p++;
}
buffer[pos++] = (char)val;
}
buffer[pos] = '\0';
return (int)pos;
}

View File

@ -0,0 +1,21 @@
#ifndef JSON_UTILS_H
#define JSON_UTILS_H
#include <stddef.h>
// Extract a JSON string field value into buffer.
// Returns pointer to buffer on success, NULL on failure.
// Very basic parser - for production use a proper JSON library.
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize);
// Extract a nested JSON object as a raw string.
// Returns a pointer into `json` at the start of the object, and sets `outLen`.
// Handles nested braces.
const char* extract_json_object(const char *json, const char *field, size_t *outLen);
// Decode a JSON array of integers (byte values) into a buffer.
// Parses e.g. [72,101,108,108,111] into "Hello".
// Returns number of bytes decoded, or -1 on error.
int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize);
#endif // JSON_UTILS_H

View File

@ -1,4 +1,5 @@
#include "../liblogosdelivery.h"
#include "json_utils.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
@ -6,33 +7,10 @@
static int create_node_ok = -1;
// Helper function to extract a JSON string field value
// Very basic parser - for production use a proper JSON library
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
start += strlen(searchStr);
const char *end = strchr(start, '"');
if (!end) {
return NULL;
}
size_t len = end - start;
if (len >= bufSize) {
len = bufSize - 1;
}
memcpy(buffer, start, len);
buffer[len] = '\0';
return buffer;
}
// Flags set by event callback, polled by main thread
static volatile int got_message_sent = 0;
static volatile int got_message_error = 0;
static volatile int got_message_received = 0;
// Event callback that handles message events
void event_callback(int ret, const char *msg, size_t len, void *userData) {
@ -62,6 +40,7 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) {
extract_json_field(eventJson, "requestId", requestId, sizeof(requestId));
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
printf("[EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash);
got_message_sent = 1;
} else if (strcmp(eventType, "message_error") == 0) {
char requestId[128];
@ -72,6 +51,7 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) {
extract_json_field(eventJson, "error", error, sizeof(error));
printf("[EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n",
requestId, messageHash, error);
got_message_error = 1;
} else if (strcmp(eventType, "message_propagated") == 0) {
char requestId[128];
@ -85,6 +65,41 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) {
extract_json_field(eventJson, "connectionStatus", connectionStatus, sizeof(connectionStatus));
printf("[EVENT] Connection status change - Status: %s\n", connectionStatus);
} else if (strcmp(eventType, "message_received") == 0) {
char messageHash[128];
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
// Extract the nested "message" object
size_t msgObjLen = 0;
const char *msgObj = extract_json_object(eventJson, "message", &msgObjLen);
if (msgObj) {
// Make a null-terminated copy of the message object
char *msgJson = malloc(msgObjLen + 1);
if (msgJson) {
memcpy(msgJson, msgObj, msgObjLen);
msgJson[msgObjLen] = '\0';
char contentTopic[256];
extract_json_field(msgJson, "contentTopic", contentTopic, sizeof(contentTopic));
// Decode payload from JSON byte array to string
char payload[4096];
int payloadLen = decode_json_byte_array(msgJson, "payload", payload, sizeof(payload));
printf("[EVENT] Message received - Hash: %s, ContentTopic: %s\n", messageHash, contentTopic);
if (payloadLen > 0) {
printf(" Payload (%d bytes): %.*s\n", payloadLen, payloadLen, payload);
} else {
printf(" Payload: (empty or could not decode)\n");
}
free(msgJson);
}
} else {
printf("[EVENT] Message received - Hash: %s (could not parse message)\n", messageHash);
}
got_message_received = 1;
} else {
printf("[EVENT] Unknown event type: %s\n", eventType);
}
@ -146,7 +161,7 @@ int main() {
logosdelivery_start_node(ctx, simple_callback, (void *)"start_node");
// Wait for node to start
sleep(10);
sleep(5);
printf("\n4. Subscribing to content topic...\n");
const char *contentTopic = "/example/1/chat/proto";
@ -181,9 +196,18 @@ int main() {
"}";
logosdelivery_send(ctx, simple_callback, (void *)"send", message);
// Wait for message events to arrive
// Poll for terminal message events (sent, error, or received) with timeout
printf("Waiting for message delivery events...\n");
sleep(60);
int timeout_sec = 60;
int elapsed = 0;
while (!(got_message_sent || got_message_error || got_message_received)
&& elapsed < timeout_sec) {
usleep(100000); // 100ms
elapsed++;
}
if (elapsed >= timeout_sec) {
printf("Timed out waiting for message events after %d seconds\n", timeout_sec);
}
printf("\n7. Unsubscribing from content topic...\n");
logosdelivery_unsubscribe(ctx, simple_callback, (void *)"unsubscribe", contentTopic);

View File

@ -35,8 +35,8 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
confValue = parseCmdArg(typeof(confValue), formattedString)
except Exception:
return err(
"Failed to parse field '" & confField & "': " &
getCurrentExceptionMsg() & ". Value: " & formattedString
"Failed to parse field '" & confField & "': " & getCurrentExceptionMsg() &
". Value: " & formattedString
)
# Create the node
@ -86,7 +86,8 @@ proc logosdelivery_create_node(
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
# free allocated resources as they won't be available
ffi.destroyFFIContext(ctx).isOkOr:
chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation", err = $error
chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation",
err = $error
return nil
return ctx
@ -125,6 +126,15 @@ proc logosdelivery_start_node(
chronicles.error "MessagePropagatedEvent.listen failed", err = $error
return err("MessagePropagatedEvent.listen failed: " & $error)
let receivedListener = MessageReceivedEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageReceived"):
$newJsonEvent("message_received", event),
).valueOr:
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
ctx.myLib[].brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
@ -149,6 +159,7 @@ proc logosdelivery_stop_node(
MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)
(await ctx.myLib[].stop()).isOkOr: