From 049fbeabbb636cc17afd40d09c252e923af87f3d Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:38:16 +0100 Subject: [PATCH] feat: allowing configuration of application level callbacks (#3206) --- library/libwaku.nim | 9 +++-- .../requests/node_lifecycle_request.nim | 16 ++++++--- waku/factory/app_callbacks.nim | 4 +++ waku/factory/node_factory.nim | 17 +++++++--- waku/factory/waku.nim | 34 ++++++++++++++++++- 5 files changed, 68 insertions(+), 12 deletions(-) create mode 100644 waku/factory/app_callbacks.nim diff --git a/library/libwaku.nim b/library/libwaku.nim index 3ab7e03e7..13022f879 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -27,7 +27,8 @@ import ./waku_thread/inter_thread_communication/requests/ping_request, ./waku_thread/inter_thread_communication/waku_thread_request, ./alloc, - ./ffi_types + ./ffi_types, + ../waku/factory/app_callbacks ################################################################################ ### Wrapper around the waku node @@ -138,10 +139,14 @@ proc waku_new( ctx.userData = userData + let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx)) + let retCode = handleRequest( ctx, RequestType.LIFECYCLE, - NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), + NodeLifecycleRequest.createShared( + NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks + ), callback, userData, ) diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 2b2edf038..087a78d3e 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -7,6 +7,7 @@ import ../../../../waku/factory/waku, ../../../../waku/factory/node_factory, ../../../../waku/factory/networks_config, + ../../../../waku/factory/app_callbacks, ../../../alloc type NodeLifecycleMsgType* = enum @@ -17,12 +18,17 @@ type NodeLifecycleMsgType* = enum type NodeLifecycleRequest* = object operation: NodeLifecycleMsgType configJson: cstring ## Only used in 'CREATE_NODE' operation + appCallbacks: AppCallbacks proc createShared*( - T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = "" + T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType, + configJson: cstring = "", + appCallbacks: AppCallbacks = nil, ): ptr type T = var ret = createShared(T) ret[].operation = op + ret[].appCallbacks = appCallbacks ret[].configJson = configJson.alloc() return ret @@ -30,7 +36,9 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) -proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = +proc createWaku( + configJson: cstring, appCallbacks: AppCallbacks = nil +): Future[Result[Waku, string]] {.async.} = var conf = defaultWakuNodeConf().valueOr: return err("Failed creating node: " & error) @@ -59,7 +67,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = formattedString & ". expected type: " & $typeof(confValue) ) - let wakuRes = Waku.new(conf).valueOr: + let wakuRes = Waku.new(conf, appCallbacks).valueOr: error "waku initialization failed", error = error return err("Failed setting up Waku: " & $error) @@ -73,7 +81,7 @@ proc process*( case self.operation of CREATE_NODE: - waku[] = (await createWaku(self.configJson)).valueOr: + waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr: error "CREATE_NODE failed", error = error return err("error processing createWaku request: " & $error) of START_NODE: diff --git a/waku/factory/app_callbacks.nim b/waku/factory/app_callbacks.nim new file mode 100644 index 000000000..ffab59c24 --- /dev/null +++ b/waku/factory/app_callbacks.nim @@ -0,0 +1,4 @@ +import ../waku_relay/protocol + +type AppCallbacks* = ref object + relayHandler*: WakuRelayHandler diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index bf45cb0d2..80734d0b8 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -124,6 +124,16 @@ proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding return uint32(MaxShardIndex + 1) +proc getAutoshards*( + node: WakuNode, contentTopics: seq[string] +): Result[seq[RelayShard], string] = + var autoShards: seq[RelayShard] + for contentTopic in contentTopics: + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + return err("Could not parse content topic: " & error) + autoShards.add(shard) + return ok(autoshards) + proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey ): Future[Result[void, string]] {.async.} = @@ -169,11 +179,8 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - var autoShards: seq[RelayShard] - for contentTopic in conf.contentTopics: - let shard = node.wakuSharding.getShard(contentTopic).valueOr: - return err("Could not parse content topic: " & error) - autoShards.add(shard) + let autoShards = node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) debug "Shards created from content topics", contentTopics = conf.contentTopics, shards = autoShards diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 4843d1b1f..4fed2f1dc 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -42,6 +42,7 @@ import ../factory/node_factory, ../factory/internal_config, ../factory/external_config, + ../factory/app_callbacks, ../waku_enr/multiaddr logScope: @@ -67,6 +68,7 @@ type Waku* = ref object restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef + appCallbacks*: AppCallbacks proc logConfig(conf: WakuNodeConf) = info "Configuration: Enabled protocols", @@ -146,7 +148,32 @@ proc newCircuitRelay(isRelayClient: bool): Relay = return RelayClient.new() return Relay.new() -proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = +proc setupAppCallbacks( + node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks +): Result[void, string] = + if appCallbacks.isNil(): + info "No external callbacks to be set" + return ok() + + if not appCallbacks.relayHandler.isNil(): + if node.wakuRelay.isNil(): + return err("Cannot configure relayHandler callback without Relay mounted") + + let autoShards = node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) + + let confShards = + conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let shards = confShards & autoShards + + for shard in shards: + discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler) + + return ok() + +proc new*( + T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil +): Result[Waku, string] = let rng = crypto.newRng() logging.setupLog(confCopy.logLevel, confCopy.logFormat) @@ -225,6 +252,10 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = let node = nodeRes.get() + node.setupAppCallbacks(confCopy, appCallbacks).isOkOr: + error "Failed setting up app callbacks", error = error + return err("Failed setting up app callbacks: " & $error) + ## Delivery Monitor var deliveryMonitor: DeliveryMonitor if confCopy.reliabilityEnabled: @@ -246,6 +277,7 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = key: confCopy.nodekey.get(), node: node, deliveryMonitor: deliveryMonitor, + appCallbacks: appCallbacks, ) waku.setupSwitchServices(confCopy, relay, rng)