From 2a03416f47a2e6726eba90fc203b7d761a4035d0 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 11 Dec 2023 08:49:13 +0100 Subject: [PATCH] chore: Cbindings allow mounting the Store protocol from libwaku (#2276) * libwaku: add changes to mount store in self-node * libwaku: remove unnecessary code for store --- examples/cbindings/waku_example.c | 39 +++++++-- library/callback.nim | 5 ++ library/libwaku.nim | 28 ++++-- library/waku_thread/config.nim | 78 ++++++++++++++++- .../requests/node_lifecycle_request.nim | 85 +++++++++++++++++++ .../requests/protocols/store_request.nim | 75 ++++++++++++++++ .../waku_thread_request.nim | 6 +- 7 files changed, 300 insertions(+), 16 deletions(-) create mode 100644 library/callback.nim create mode 100644 library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index ec1b8f9d3..dcb1d74d9 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -28,6 +28,13 @@ struct ConfigNode { char key[128]; int relay; char peers[2048]; + int store; + char storeNode[2048]; + char storeRetentionPolicy[64]; + char storeDbUrl[256]; + int storeVacuum; + int storeDbMigration; + int storeMaxNumDbConnections; }; // libwaku Context @@ -247,6 +254,14 @@ int main(int argc, char** argv) { cfgNode.port = 60000; cfgNode.relay = 1; + cfgNode.store = 1; + snprintf(cfgNode.storeNode, 2048, ""); + snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000"); + snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres"); + cfgNode.storeVacuum = 0; + cfgNode.storeDbMigration = 0; + cfgNode.storeMaxNumDbConnections = 30; + if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN) { show_help_and_exit(); @@ -254,16 +269,24 @@ int main(int argc, char** argv) { ctx = waku_init(event_handler, userData); - char jsonConfig[1024]; - snprintf(jsonConfig, 1024, "{ \ - \"host\": \"%s\", \ - \"port\": %d, \ - \"key\": \"%s\", \ - \"relay\": %s \ + char jsonConfig[2048]; + snprintf(jsonConfig, 2048, "{ \ + \"host\": \"%s\", \ + \"port\": %d, \ + \"key\": \"%s\", \ + \"relay\": %s, \ + \"store\": %s, \ + \"storeDbUrl\": \"%s\", \ + \"storeRetentionPolicy\": \"%s\", \ + \"storeMaxNumDbConnections\": %d \ }", cfgNode.host, cfgNode.port, cfgNode.key, - cfgNode.relay ? "true":"false"); + cfgNode.relay ? "true":"false", + cfgNode.store ? "true":"false", + cfgNode.storeDbUrl, + cfgNode.storeRetentionPolicy, + cfgNode.storeMaxNumDbConnections); WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) ); WAKU_CALL( waku_version(&ctx, print_waku_version, userData) ); @@ -272,7 +295,6 @@ int main(int argc, char** argv) { printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) ); - waku_set_event_callback(event_handler, userData); waku_start(&ctx, event_handler, userData); @@ -288,6 +310,7 @@ int main(int argc, char** argv) { "/waku/2/default-waku/proto", event_handler, userData) ); + show_main_menu(); while(1) { handle_user_input(); diff --git a/library/callback.nim b/library/callback.nim new file mode 100644 index 000000000..79cb0c005 --- /dev/null +++ b/library/callback.nim @@ -0,0 +1,5 @@ + +type + WakuCallBack* = proc(callerRet: cint, + msg: ptr cchar, + len: csize_t) {.cdecl, gcsafe.} diff --git a/library/libwaku.nim b/library/libwaku.nim index 9f6449d53..c60c752bd 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -18,8 +18,10 @@ import ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, ./waku_thread/inter_thread_communication/requests/protocols/relay_request, + ./waku_thread/inter_thread_communication/requests/protocols/store_request, ./waku_thread/inter_thread_communication/waku_thread_request, - ./alloc + ./alloc, + ./callback ################################################################################ ### Wrapper around the waku node @@ -32,11 +34,6 @@ const RET_OK: cint = 0 const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 -type - WakuCallBack* = proc(callerRet: cint, - msg: ptr cchar, - len: csize_t) {.cdecl, gcsafe.} - ### End of exported types ################################################################################ @@ -348,5 +345,24 @@ proc waku_connect(ctx: ptr ptr Context, return RET_OK +proc waku_store_query(ctx: ptr ptr Context, + queryJson: cstring, + peerId: cstring, + timeoutMs: cint, + callback: WakuCallBack, + userData: pointer): cint + {.dynlib, exportc.} = + + ctx[][].userData = userData + + ## TODO: implement the logic that make the "self" node to act as a Store client + + # if sendReqRes.isErr(): + # let msg = $sendReqRes.error + # callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR + + return RET_OK + ### End of exported procs ################################################################################ diff --git a/library/waku_thread/config.nim b/library/waku_thread/config.nim index 6309b6771..29ca7b11c 100644 --- a/library/waku_thread/config.nim +++ b/library/waku_thread/config.nim @@ -91,6 +91,71 @@ proc parseRelay(jsonNode: JsonNode, return true +proc parseStore(jsonNode: JsonNode, + store: var bool, + storeNode: var string, + storeRetentionPolicy: var string, + storeDbUrl: var string, + storeVacuum: var bool, + storeDbMigration: var bool, + storeMaxNumDbConnections: var int, + jsonResp: var JsonEvent): bool = + + if not jsonNode.contains("store"): + ## the store parameter is not required. By default is is disabled + store = false + return true + + if jsonNode["store"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The store config param should be a boolean"); + return false + + store = jsonNode["store"].getBool() + + if jsonNode.contains("storeNode"): + if jsonNode["storeNode"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeNode config param should be a string"); + return false + + storeNode = jsonNode["storeNode"].getStr() + + if jsonNode.contains("storeRetentionPolicy"): + if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string"); + return false + + storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr() + + if jsonNode.contains("storeDbUrl"): + if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string"); + return false + + storeDbUrl = jsonNode["storeDbUrl"].getStr() + + if jsonNode.contains("storeVacuum"): + if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool"); + return false + + storeVacuum = jsonNode["storeVacuum"].getBool() + + if jsonNode.contains("storeDbMigration"): + if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool"); + return false + + storeDbMigration = jsonNode["storeDbMigration"].getBool() + + if jsonNode.contains("storeMaxNumDbConnections"): + if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt: + jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int"); + return false + + storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt() + + return true + proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) = if jsonNode.contains("topics"): for topic in jsonNode["topics"].items: @@ -103,6 +168,13 @@ proc parseConfig*(configNodeJson: string, netConfig: var NetConfig, relay: var bool, topics: var seq[string], + store: var bool, + storeNode: var string, + storeRetentionPolicy: var string, + storeDbUrl: var string, + storeVacuum: var bool, + storeDbMigration: var bool, + storeMaxNumDbConnections: var int, jsonResp: var JsonEvent): bool = if configNodeJson.len == 0: @@ -110,7 +182,6 @@ proc parseConfig*(configNodeJson: string, return false var jsonNode: JsonNode - try: jsonNode = parseJson(configNodeJson) except JsonParsingError: @@ -152,6 +223,11 @@ proc parseConfig*(configNodeJson: string, # topics parseTopics(jsonNode, topics) + # store + if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl, + storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp): + return false + let wakuFlags = CapabilitiesBitfield.init( lightpush = false, filter = false, diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index cf134ceeb..bf83ab1a7 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -3,6 +3,7 @@ import std/options import chronos, + chronicles, stew/results, stew/shims/net import @@ -17,7 +18,12 @@ import ../../../../waku/node/waku_node, ../../../../waku/node/builder, ../../../../waku/node/config, + ../../../../waku/waku_archive/driver/builder, + ../../../../waku/waku_archive/driver, + ../../../../waku/waku_archive/retention_policy/builder, + ../../../../waku/waku_archive/retention_policy, ../../../../waku/waku_relay/protocol, + ../../../../waku/waku_store, ../../../events/[json_error_event,json_message_event,json_base_event], ../../../alloc, ../../config @@ -46,14 +52,77 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) +proc configureStore(node: WakuNode, + storeNode: string, + storeRetentionPolicy: string, + storeDbUrl: string, + storeVacuum: bool, + storeDbMigration: bool, + storeMaxNumDbConnections: int): + Future[Result[void, string]] {.async.} = + ## This snippet is extracted/duplicated from the app.nim file + + var onErrAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + # error "Unrecoverable error occurred", error = msg + ## TODO: use a callback given as a parameter + discard + + # Archive setup + let archiveDriverRes = ArchiveDriver.new(storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections, + onErrAction) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node.mountArchive(archiveDriverRes.get(), + retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) + + # Store setup + try: + await mountStore(node) + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + + mountStoreClient(node) + if storeNode != "": + let storeNodeInfo = parsePeerInfo(storeNode) + if storeNodeInfo.isOk(): + node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec) + else: + return err("failed to set node waku store peer: " & storeNodeInfo.error) + + return ok() + proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} = var privateKey: PrivateKey var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), Port(60000'u16)).value + + ## relay var relay: bool var topics = @[""] + + ## store + var store: bool + var storeNode: string + var storeRetentionPolicy: string + var storeDbUrl: string + var storeVacuum: bool + var storeDbMigration: bool + var storeMaxNumDbConnections: int + var jsonResp: JsonEvent if not parseConfig($configJson, @@ -61,6 +130,13 @@ proc createNode(configJson: cstring): netConfig, relay, topics, + store, + storeNode, + storeRetentionPolicy, + storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections, jsonResp): return err($jsonResp) @@ -113,6 +189,15 @@ proc createNode(configJson: cstring): await newNode.mountRelay() newNode.peerManager.start() + if store: + (await newNode.configureStore(storeNode, + storeRetentionPolicy, + storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections)).isOkOr: + return err("error configuring store: " & $error) + return ok(newNode) proc process*(self: ptr NodeLifecycleRequest, diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim new file mode 100644 index 000000000..2ae76903e --- /dev/null +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -0,0 +1,75 @@ + +import + std/[options,sequtils,strutils] +import + chronos, + stew/results, + stew/shims/net +import + ../../../../../waku/node/waku_node, + ../../../../../waku/waku_archive/driver/builder, + ../../../../../waku/waku_archive/driver, + ../../../../../waku/waku_archive/retention_policy/builder, + ../../../../../waku/waku_archive/retention_policy, + ../../../../alloc, + ../../../../callback + +type + StoreReqType* = enum + REMOTE_QUERY ## to perform a query to another Store node + LOCAL_QUERY ## to retrieve the data from 'self' node + +type + StoreQueryRequest* = object + queryJson: cstring + peerAddr: cstring + timeoutMs: cint + storeCallback: WakuCallBack + +type + StoreRequest* = object + operation: StoreReqType + storeReq: pointer + +proc createShared*(T: type StoreRequest, + operation: StoreReqType, + request: pointer): ptr type T = + var ret = createShared(T) + ret[].request = request + return ret + +proc createShared*(T: type StoreQueryRequest, + queryJson: cstring, + peerAddr: cstring, + timeoutMs: cint, + storeCallback: WakuCallBack = nil): ptr type T = + + var ret = createShared(T) + ret[].timeoutMs = timeoutMs + ret[].queryJson = queryJson.alloc() + ret[].peerAddr = peerAddr.alloc() + ret[].storeCallback = storeCallback + return ret + +proc destroyShared(self: ptr StoreQueryRequest) = + deallocShared(self[].queryJson) + deallocShared(self[].peerAddr) + deallocShared(self) + +proc process(self: ptr StoreQueryRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + defer: destroyShared(self) + +proc process*(self: ptr StoreRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + + defer: deallocShared(self) + + case self.operation: + of REMOTE_QUERY: + return await cast[ptr StoreQueryRequest](self[].storeReq).process(node) + of LOCAL_QUERY: + discard + # cast[ptr StoreQueryRequest](request[].reqContent).process(node) + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 79ba7797d..a38c09dba 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -12,13 +12,15 @@ import ../../../waku/node/waku_node, ./requests/node_lifecycle_request, ./requests/peer_manager_request, - ./requests/protocols/relay_request + ./requests/protocols/relay_request, + ./requests/protocols/store_request type RequestType* {.pure.} = enum LIFECYCLE, PEER_MANAGER, RELAY, + STORE, type InterThreadRequest* = object @@ -50,6 +52,8 @@ proc process*(T: type InterThreadRequest, cast[ptr PeerManagementRequest](request[].reqContent).process(node[]) of RELAY: cast[ptr RelayRequest](request[].reqContent).process(node) + of STORE: + cast[ptr StoreRequest](request[].reqContent).process(node) return await retFut