mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
feat(liblogosdelivery): add logosdelivery_query_store FFI
Expose Waku Store client queries on the liblogosdelivery C ABI so hosts can run historical message queries without libwaku. Implementation mirrors kernel store JSON parsing and delegates to wakuStoreClient.query.
This commit is contained in:
parent
42e0aa43d1
commit
3ef193ab94
@ -71,6 +71,17 @@ extern "C"
|
||||
void *userData,
|
||||
const char *messageJson);
|
||||
|
||||
// Query historical messages via Waku Store against peerAddr (comma-separated multiaddrs).
|
||||
// jsonQuery: JSON matching StoreQueryRequest fields (see logos-delivery library store_api).
|
||||
// On success, the callback message is UTF-8 JSON (hex-encoded StoreQueryResponse).
|
||||
// timeoutMs is reserved for future use (RPC timeouts apply internally today).
|
||||
int logosdelivery_query_store(void *ctx,
|
||||
FFICallBack callback,
|
||||
void *userData,
|
||||
const char *jsonQuery,
|
||||
const char *peerAddr,
|
||||
int timeoutMs);
|
||||
|
||||
// Sets a callback that will be invoked whenever an event occurs.
|
||||
// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe.
|
||||
void logosdelivery_set_event_callback(void *ctx,
|
||||
|
||||
@ -8,4 +8,5 @@ import waku/factory/waku, waku/node/waku_node, ./declare_lib
|
||||
include
|
||||
./logos_delivery_api/node_api,
|
||||
./logos_delivery_api/messaging_api,
|
||||
./logos_delivery_api/store_api,
|
||||
./logos_delivery_api/debug_api
|
||||
|
||||
104
liblogosdelivery/logos_delivery_api/store_api.nim
Normal file
104
liblogosdelivery/logos_delivery_api/store_api.nim
Normal file
@ -0,0 +1,104 @@
|
||||
import std/[json, sugar, strutils, options]
|
||||
import chronos, chronicles, results, stew/byteutils, ffi
|
||||
import
|
||||
waku/factory/waku,
|
||||
library/utils,
|
||||
waku/waku_core/peers,
|
||||
waku/waku_core/message/digest,
|
||||
waku/waku_store/common,
|
||||
waku/waku_store/client,
|
||||
waku/common/paging,
|
||||
../declare_lib
|
||||
|
||||
func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] =
|
||||
var contentTopics: seq[string]
|
||||
if jsonContent.contains("contentTopics"):
|
||||
contentTopics = collect(newSeq):
|
||||
for cTopic in jsonContent["contentTopics"].getElems():
|
||||
cTopic.getStr()
|
||||
|
||||
var msgHashes: seq[WakuMessageHash]
|
||||
if jsonContent.contains("messageHashes"):
|
||||
for hashJsonObj in jsonContent["messageHashes"].getElems():
|
||||
let hash = hashJsonObj.getStr().hexToHash().valueOr:
|
||||
return err("Failed converting message hash hex string to bytes: " & error)
|
||||
msgHashes.add(hash)
|
||||
|
||||
let pubsubTopic =
|
||||
if jsonContent.contains("pubsubTopic"):
|
||||
some(jsonContent["pubsubTopic"].getStr())
|
||||
else:
|
||||
none(string)
|
||||
|
||||
let paginationCursor =
|
||||
if jsonContent.contains("paginationCursor"):
|
||||
let hash = jsonContent["paginationCursor"].getStr().hexToHash().valueOr:
|
||||
return err("Failed converting paginationCursor hex string to bytes: " & error)
|
||||
some(hash)
|
||||
else:
|
||||
none(WakuMessageHash)
|
||||
|
||||
let paginationForwardBool = jsonContent["paginationForward"].getBool()
|
||||
let paginationForward =
|
||||
if paginationForwardBool: PagingDirection.FORWARD else: PagingDirection.BACKWARD
|
||||
|
||||
let paginationLimit =
|
||||
if jsonContent.contains("paginationLimit"):
|
||||
some(uint64(jsonContent["paginationLimit"].getInt()))
|
||||
else:
|
||||
none(uint64)
|
||||
|
||||
let startTime = ?jsonContent.getProtoInt64("timeStart")
|
||||
let endTime = ?jsonContent.getProtoInt64("timeEnd")
|
||||
|
||||
return ok(
|
||||
StoreQueryRequest(
|
||||
requestId: jsonContent["requestId"].getStr(),
|
||||
includeData: jsonContent["includeData"].getBool(),
|
||||
pubsubTopic: pubsubTopic,
|
||||
contentTopics: contentTopics,
|
||||
startTime: startTime,
|
||||
endTime: endTime,
|
||||
messageHashes: msgHashes,
|
||||
paginationCursor: paginationCursor,
|
||||
paginationForward: paginationForward,
|
||||
paginationLimit: paginationLimit,
|
||||
)
|
||||
)
|
||||
|
||||
proc logosdelivery_query_store(
|
||||
ctx: ptr FFIContext[Waku],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
jsonQuery: cstring,
|
||||
peerAddr: cstring,
|
||||
timeoutMs: cint,
|
||||
) {.ffi.} =
|
||||
## Queries historical messages via the node's Store client (Waku Store protocol).
|
||||
## Mirrors ``library/kernel_api/protocols/store_api.waku_store_query`` but uses the
|
||||
## ``logosdelivery_query_store`` C symbol consumed by ``logos-delivery-module``.
|
||||
##
|
||||
## ``timeoutMs`` is reserved for future timeout plumbing; the Store client's RPC uses its own timeouts today.
|
||||
requireInitializedNode(ctx, "QUERY_STORE"):
|
||||
return err(errMsg)
|
||||
|
||||
discard timeoutMs
|
||||
|
||||
let jsonContentRes = catch:
|
||||
parseJson($jsonQuery)
|
||||
|
||||
if jsonContentRes.isErr():
|
||||
return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg)
|
||||
|
||||
let storeQueryRequest = ?fromJsonNode(jsonContentRes.get())
|
||||
|
||||
let peer = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
|
||||
return err("StoreRequest failed to parse peer addr: " & $error)
|
||||
|
||||
let queryResponse = (
|
||||
await ctx.myLib[].node.wakuStoreClient.query(storeQueryRequest, peer)
|
||||
).valueOr:
|
||||
return err("StoreRequest failed store query: " & $error)
|
||||
|
||||
let res = $(%*(queryResponse.toHex()))
|
||||
return ok(res)
|
||||
Loading…
x
Reference in New Issue
Block a user