NagyZoltanPeter 1fd25355e0
feat: waku api send (#3669)
* Introduce api/send
Added events and requests for support.
Reworked delivery_monitor into a featured devlivery_service, that
- supports relay publish and lightpush depending on configuration but with fallback options
- if available and configured it utilizes store api to confirm message delivery
- emits message delivery events accordingly

prepare for use in api_example

* Fix edge mode config and test added
* Fix some import issues, start and stop waku shall not throw exception but return with result properly
* Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe where appropriate, removed leftover
* add api_example app to examples2
* Adapt after merge from master
* Adapt code for using broker context
* Fix brokerCtx settings for all usedbrokers, cover locked node init
* Various fixes upon test failures. Added initial of subscribe API and auto-subscribe for send api
* More test added
* Fix multi propagate event emit, fix fail send test case
* Fix rebase
* Fix PushMessageHandlers in tests
* adapt libwaku to api changes
* Fix relay test by adapting publish return error in case NoPeersToPublish
* Addressing all remaining review findings. Removed leftovers. Fixed loggings and typos
* Fix rln relay broker, missed brokerCtx
* Fix rest relay test failed, due to publish will fail if no peer avail
* ignore anvil test state file
* Make terst_wakunode_rln_relay broker context aware to fix
* Fix waku rln tests by having them broker context aware
* fix typo in test_app.nim
2026-01-30 01:06:00 +01:00

79 lines
2.7 KiB
Nim

import std/options
import chronos, chronicles
import waku/[waku_core], waku/waku_lightpush/[common, rpc]
import waku/requests/health_request
import waku/common/broker/broker_context
import waku/api/types
import ./[delivery_task, send_processor]
logScope:
topics = "send service relay processor"
type RelaySendProcessor* = ref object of BaseSendProcessor
publishProc: PushMessageHandler
fallbackStateToSet: DeliveryState
proc new*(
T: typedesc[RelaySendProcessor],
lightpushAvailable: bool,
publishProc: PushMessageHandler,
brokerCtx: BrokerContext,
): RelaySendProcessor =
let fallbackStateToSet =
if lightpushAvailable:
DeliveryState.FallbackRetry
else:
DeliveryState.FailedToDeliver
return RelaySendProcessor(
publishProc: publishProc,
fallbackStateToSet: fallbackStateToSet,
brokerCtx: brokerCtx,
)
proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
error "isTopicHealthy: failed to get health report", topic = topic, error = error
return false
if healthReport.topicHealth.len() < 1:
warn "isTopicHealthy: no topic health entries", topic = topic
return false
let health = healthReport.topicHealth[0].health
debug "isTopicHealthy: topic health is ", topic = topic, health = health
return health == MINIMALLY_HEALTHY or health == SUFFICIENTLY_HEALTHY
method isValidProcessor*(
self: RelaySendProcessor, task: DeliveryTask
): bool {.gcsafe.} =
# Topic health query is not reliable enough after a fresh subscribe...
# return self.isTopicHealthy(task.pubsubTopic)
return true
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} =
task.tryCount.inc()
info "Trying message delivery via Relay",
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
tryCount = task.tryCount
let noOfPublishedPeers = (await self.publishProc(task.pubsubTopic, task.msg)).valueOr:
let errorMessage = error.desc.get($error.code)
error "Failed to publish message with relay",
request = task.requestId, msgHash = task.msgHash.to0xHex(), error = errorMessage
if error.code != LightPushErrorCode.NO_PEERS_TO_RELAY:
task.state = DeliveryState.FailedToDeliver
task.errorDesc = errorMessage
else:
task.state = self.fallbackStateToSet
return
if noOfPublishedPeers > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash.to0xHex(), noOfPeers = noOfPublishedPeers
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
else:
# It shall not happen, but still covering it
task.state = self.fallbackStateToSet