NagyZoltanPeter 1fd25355e0
feat: waku api send (#3669)
* Introduce api/send
Added events and requests for support.
Reworked delivery_monitor into a featured devlivery_service, that
- supports relay publish and lightpush depending on configuration but with fallback options
- if available and configured it utilizes store api to confirm message delivery
- emits message delivery events accordingly

prepare for use in api_example

* Fix edge mode config and test added
* Fix some import issues, start and stop waku shall not throw exception but return with result properly
* Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe where appropriate, removed leftover
* add api_example app to examples2
* Adapt after merge from master
* Adapt code for using broker context
* Fix brokerCtx settings for all usedbrokers, cover locked node init
* Various fixes upon test failures. Added initial of subscribe API and auto-subscribe for send api
* More test added
* Fix multi propagate event emit, fix fail send test case
* Fix rebase
* Fix PushMessageHandlers in tests
* adapt libwaku to api changes
* Fix relay test by adapting publish return error in case NoPeersToPublish
* Addressing all remaining review findings. Removed leftovers. Fixed loggings and typos
* Fix rln relay broker, missed brokerCtx
* Fix rest relay test failed, due to publish will fail if no peer avail
* ignore anvil test state file
* Make terst_wakunode_rln_relay broker context aware to fix
* Fix waku rln tests by having them broker context aware
* fix typo in test_app.nim
2026-01-30 01:06:00 +01:00

131 lines
4.1 KiB
Nim

{.push raises: [].}
import std/options, results, stew/byteutils, chronicles, chronos, metrics, bearssl/rand
import
../node/peer_manager/peer_manager,
../waku_core,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush legacy"
type WakuLegacyLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
proc handleRequest*(
wl: WakuLegacyLightPush, peerId: PeerId, buffer: seq[byte]
): Future[PushRPC] {.async.} =
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
pushResponseInfo = ""
requestId = ""
if reqDecodeRes.isErr():
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
else:
let pushRpcRequest = reqDecodeRes.get()
requestId = pushRpcRequest.requestId
let
request = pushRpcRequest.request
pubSubTopic = request.get().pubSubTopic
message = request.get().message
let msg_hash = pubsubTopic.computeMessageHash(message).to0xHex()
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
notice "handling legacy lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(pubsubTopic, message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error = pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
let rpc = PushRPC(requestId: requestId, response: some(response))
return rpc
proc initProtocolHandler(wl: WakuLegacyLightPush) =
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var rpc: PushRPC
defer:
await conn.closeWithEOF()
wl.requestRateLimiter.checkUsageLimit(WakuLegacyLightPushCodec, conn):
var buffer: seq[byte]
try:
buffer = await conn.readLp(DefaultMaxRpcSize)
except LPStreamError:
error "lightpush legacy read stream failed", error = getCurrentExceptionMsg()
return
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLegacyLightPushCodec, "in"]
)
try:
rpc = await handleRequest(wl, conn.peerId, buffer)
except CatchableError:
error "lightpush legacy handleRequest failed", error = getCurrentExceptionMsg()
do:
info "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
PushRPC(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
response:
some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))),
)
)
try:
await conn.writeLp(rpc.encode().buffer)
except LPStreamError:
error "lightpush legacy write stream failed", error = getCurrentExceptionMsg()
## For lightpush might not worth to measure outgoing trafic as it is only
## small respones about success/failure
wl.handler = handler
wl.codec = WakuLegacyLightPushCodec
proc new*(
T: type WakuLegacyLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLegacyLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLegacyLightPushCodec, rateLimitSetting)
return wl