feat: allowing configuration of application level callbacks (#3206)

This commit is contained in:
gabrielmer 2024-12-13 17:38:16 +01:00 committed by GitHub
parent ab0c1d4aa0
commit 049fbeabbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 68 additions and 12 deletions

View File

@ -27,7 +27,8 @@ import
./waku_thread/inter_thread_communication/requests/ping_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc,
./ffi_types
./ffi_types,
../waku/factory/app_callbacks
################################################################################
### Wrapper around the waku node
@ -138,10 +139,14 @@ proc waku_new(
ctx.userData = userData
let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))
let retCode = handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson),
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks
),
callback,
userData,
)

View File

@ -7,6 +7,7 @@ import
../../../../waku/factory/waku,
../../../../waku/factory/node_factory,
../../../../waku/factory/networks_config,
../../../../waku/factory/app_callbacks,
../../../alloc
type NodeLifecycleMsgType* = enum
@ -17,12 +18,17 @@ type NodeLifecycleMsgType* = enum
type NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
appCallbacks: AppCallbacks
proc createShared*(
T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = ""
T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = "",
appCallbacks: AppCallbacks = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].appCallbacks = appCallbacks
ret[].configJson = configJson.alloc()
return ret
@ -30,7 +36,9 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil
): Future[Result[Waku, string]] {.async.} =
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating node: " & error)
@ -59,7 +67,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
formattedString & ". expected type: " & $typeof(confValue)
)
let wakuRes = Waku.new(conf).valueOr:
let wakuRes = Waku.new(conf, appCallbacks).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)
@ -73,7 +81,7 @@ proc process*(
case self.operation
of CREATE_NODE:
waku[] = (await createWaku(self.configJson)).valueOr:
waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr:
error "CREATE_NODE failed", error = error
return err("error processing createWaku request: " & $error)
of START_NODE:

View File

@ -0,0 +1,4 @@
import ../waku_relay/protocol
type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler

View File

@ -124,6 +124,16 @@ proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc getAutoshards*(
node: WakuNode, contentTopics: seq[string]
): Result[seq[RelayShard], string] =
var autoShards: seq[RelayShard]
for contentTopic in contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
return ok(autoshards)
proc setupProtocols(
node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey
): Future[Result[void, string]] {.async.} =
@ -169,11 +179,8 @@ proc setupProtocols(
peerExchangeHandler = some(handlePeerExchange)
var autoShards: seq[RelayShard]
for contentTopic in conf.contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = autoShards

View File

@ -42,6 +42,7 @@ import
../factory/node_factory,
../factory/internal_config,
../factory/external_config,
../factory/app_callbacks,
../waku_enr/multiaddr
logScope:
@ -67,6 +68,7 @@ type Waku* = ref object
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
appCallbacks*: AppCallbacks
proc logConfig(conf: WakuNodeConf) =
info "Configuration: Enabled protocols",
@ -146,7 +148,32 @@ proc newCircuitRelay(isRelayClient: bool): Relay =
return RelayClient.new()
return Relay.new()
proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
proc setupAppCallbacks(
node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks
): Result[void, string] =
if appCallbacks.isNil():
info "No external callbacks to be set"
return ok()
if not appCallbacks.relayHandler.isNil():
if node.wakuRelay.isNil():
return err("Cannot configure relayHandler callback without Relay mounted")
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
for shard in shards:
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)
return ok()
proc new*(
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
): Result[Waku, string] =
let rng = crypto.newRng()
logging.setupLog(confCopy.logLevel, confCopy.logFormat)
@ -225,6 +252,10 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
let node = nodeRes.get()
node.setupAppCallbacks(confCopy, appCallbacks).isOkOr:
error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error)
## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
@ -246,6 +277,7 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
key: confCopy.nodekey.get(),
node: node,
deliveryMonitor: deliveryMonitor,
appCallbacks: appCallbacks,
)
waku.setupSwitchServices(confCopy, relay, rng)