Addressing all remaining review findings. Removed leftovers. Fixed loggings and typos

This commit is contained in:
NagyZoltanPeter 2026-01-28 20:36:03 +01:00
parent c00ad8d876
commit cc32e91074
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
15 changed files with 58 additions and 75 deletions

View File

@ -57,14 +57,15 @@ proc send*(
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr:
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:
return err("API send: Failed to create delivery task: " & error)
info "API send: scheduling delivery task",
requestId = $requestId,
pubsubTopic = deliveryTask.pubsubTopic,
contentTopic = deliveryTask.msg.contentTopic,
msgHash = deliveryTask.msgHash.shortLog()
msgHash = deliveryTask.msgHash.to0xHex(),
myPeerId = w.node.peerId()
asyncSpawn w.deliveryService.sendService.send(deliveryTask)

View File

@ -7,19 +7,19 @@ This document collects logic and todo's around the Send API.
## Overview
Send api hides the complex logic of using raw protocols for reliable message delivery.
The delivery method is chosen based on the node configuration and actual availabilites of peers.
The delivery method is chosen based on the node configuration and actual availabilities of peers.
## Delivery task
Each message send request is boundled into a task that not just holds the composed message but also the state of the delivery.
Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery.
## Delivery methods
Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes:
- P2PReliability validation - checking network store node wheather the message is reached at least a store node.
- P2PReliability validation - checking network store node whether the message is reached at least a store node.
- Simple retry until message is propagated to the network
- Relay says >0 peers as publish result
- LightushClient returns with success
- LightpushClient returns with success
Depending on node config:
- Relay
@ -28,9 +28,9 @@ Depending on node config:
These methods are used in combination to achieve the best reliability.
Fallback mechanism is used to switch between methods if the current one fails.
Relay+StoreCheck -> Relay+simeple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
Combination is dynamicly chosen on node configuration. Levels can be skipped depending on actual connectivity.
Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity.
Actual connectivity is checked:
- Relay's topic health check - at least dLow peers in the mesh for the topic
- Store nodes availability - at least one store service node is available in peer manager

View File

@ -1,14 +1,11 @@
{.push raises: [].}
import bearssl/rand, std/times, chronos, chronicles
import bearssl/rand, std/times, chronos
import stew/byteutils
import waku/utils/requests as request_utils
import waku/waku_core/[topics/content_topic, message/message, time]
import waku/requests/requests
logScope:
topics = "message envelope"
type
MessageEnvelope* = object
contentTopic*: ContentTopic

View File

@ -367,9 +367,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
await waku_dnsdisc.retrieveDynamicBootstrapNodes(
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
)
except CatchableError:
except CatchableError as exc:
Result[seq[RemotePeerInfo], string].err(
"Retrieving dynamic bootstrap nodes failed: " & getCurrentExceptionMsg()
"Retrieving dynamic bootstrap nodes failed: " & exc.msg
)
if dynamicBootstrapNodesRes.isErr():
@ -388,7 +388,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
(await updateWaku(waku)).isOkOr:
return err("Error in updateApp: " & $error)
except CatchableError:
return err("Error in updateApp: " & getCurrentExceptionMsg())
return err("Caught exception in updateApp: " & getCurrentExceptionMsg())
## Discv5
if conf.discv5Conf.isSome():
@ -464,9 +464,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
filterClientReady = filterClientReady,
details = $(healthReport)
ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError:
err("Failed to read health report: " & getCurrentExceptionMsg()),
return ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError as exc:
err("Failed to read health report: " & exc.msg),
).isOkOr:
error "Failed to set RequestNodeHealth provider", error = error
@ -496,7 +496,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
return err("Starting monitoring and external interfaces failed: " & error)
except CatchableError:
return err(
"Starting monitoring and external interfaces failed: " & getCurrentExceptionMsg()
"Caught exception starting monitoring and external interfaces failed: " &
getCurrentExceptionMsg()
)
waku[].healthMonitor.setOverallHealth(HealthStatus.READY)
@ -545,7 +546,4 @@ proc isModeEdgeAvailable*(waku: Waku): bool =
waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and
not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil()
proc isP2PReliabilityEnabled*(waku: Waku): bool =
return not waku.deliveryService.isNil()
{.pop.}

View File

@ -196,7 +196,7 @@ proc startRecvService*(self: RecvService) =
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} =
proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
@ -204,7 +204,7 @@ proc startRecvService*(self: RecvService) =
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} =
proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error

View File

@ -5,10 +5,12 @@ import waku/common/broker/broker_context
type DeliveryState* {.pure.} = enum
Entry
SuccessfullyPropagated
# message is known to be sent to the network but not yet validated
SuccessfullyValidated
FallbackRetry
NextRoundRetry
FailedToDeliver
# message is known to be stored at least on one store node, thus validated
FallbackRetry # retry sending with fallback processor if available
NextRoundRetry # try sending in next loop
FailedToDeliver # final state of failed delivery
type DeliveryTask* = ref object
requestId*: RequestId
@ -21,7 +23,7 @@ type DeliveryTask* = ref object
propagateEventEmitted*: bool
errorDesc*: string
proc create*(
proc new*(
T: typedesc[DeliveryTask],
requestId: RequestId,
envelop: MessageEnvelope,
@ -32,8 +34,8 @@ proc create*(
let relayShardRes = (
RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic)
).valueOr:
echo "RequestRelayShard.request error", $error
return err($error)
error "RequestRelayShard.request failed", error = error
return err("Failed create DeliveryTask: " & $error)
let pubsubTopic = relayShardRes.relayShard.toPubsubTopic()
let msgHash = computeMessageHash(pubsubTopic, msg)

View File

@ -45,15 +45,16 @@ method sendImpl*(
tryCount = task.tryCount
let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr:
debug "No peer available for Lightpush, request pushed back for next round",
requestId = task.requestId
task.state = DeliveryState.NextRoundRetry
return
let pushResult =
let numLightpushServers = (
await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer)
if pushResult.isErr:
error "LightpushSendProcessor sendImpl failed",
error = pushResult.error.desc.get($pushResult.error.code)
case pushResult.error.code
).valueOr:
error "LightpushSendProcessor.sendImpl failed", error = error.desc.get($error.code)
case error.code
of LightPushErrorCode.NO_PEERS_TO_RELAY, LightPushErrorCode.TOO_MANY_REQUESTS,
LightPushErrorCode.OUT_OF_RLN_PROOF, LightPushErrorCode.SERVICE_NOT_AVAILABLE,
LightPushErrorCode.INTERNAL_SERVER_ERROR:
@ -61,11 +62,11 @@ method sendImpl*(
else:
# the message is malformed, send error
task.state = DeliveryState.FailedToDeliver
task.errorDesc = pushResult.error.desc.get($pushResult.error.code)
task.errorDesc = error.desc.get($error.code)
task.deliveryTime = Moment.now()
return
if pushResult.isOk and pushResult.get() > 0:
if numLightpushServers > 0:
info "Message propagated via Lightpush",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
task.state = DeliveryState.SuccessfullyPropagated
@ -73,6 +74,8 @@ method sendImpl*(
# TODO: with a simple retry processor it might be more accurate to say `Sent`
else:
# Controversial state, publish says ok but no peer. It should not happen.
debug "Lightpush publish returned zero peers, request pushed back for next round",
requestId = task.requestId
task.state = DeliveryState.NextRoundRetry
return

View File

@ -1,5 +1,5 @@
import chronos, chronicles
import std/options
import chronos, chronicles
import waku/[waku_core], waku/waku_lightpush/[common, rpc]
import waku/requests/health_request
import waku/common/broker/broker_context
@ -14,7 +14,7 @@ type RelaySendProcessor* = ref object of BaseSendProcessor
fallbackStateToSet: DeliveryState
proc new*(
T: type RelaySendProcessor,
T: typedesc[RelaySendProcessor],
lightpushAvailable: bool,
publishProc: PushMessageHandler,
brokerCtx: BrokerContext,
@ -50,7 +50,7 @@ method isValidProcessor*(
# return self.isTopicHealthy(task.pubsubTopic)
return true
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} =
task.tryCount.inc()
info "Trying message delivery via Relay",
requestId = task.requestId,
@ -70,7 +70,7 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.a
if noOfPublishedPeers > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash
requestId = task.requestId, msgHash = task.msgHash.to0xHex(), noOfPeers = noOfPublishedPeers
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
else:

View File

@ -95,7 +95,10 @@ proc setupSendProcessorChain(
return ok(processors[0])
proc new*(
T: type SendService, preferP2PReliability: bool, w: WakuNode, s: SubscriptionService
T: typedesc[SendService],
preferP2PReliability: bool,
w: WakuNode,
s: SubscriptionService,
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(
@ -107,7 +110,7 @@ proc new*(
let sendProcessorChain = setupSendProcessorChain(
w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx
).valueOr:
return err(error)
return err("failed to setup SendProcessorChain: " & $error)
let sendService = SendService(
brokerCtx: w.brokerCtx,
@ -137,6 +140,7 @@ proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.a
return
var hashesToValidate = tasksToValidate.mapIt(it.msgHash)
# TODO: confirm hash format for store query!!!
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
@ -172,7 +176,7 @@ proc checkStoredMessages(self: SendService) {.async.} =
proc reportTaskResult(self: SendService, task: DeliveryTask) =
case task.state
of DeliveryState.SuccessfullyPropagated:
# TODO: in case of of unable to strore check messages shall we report success instead?
# TODO: in case of unable to strore check messages shall we report success instead?
if not task.propagateEventEmitted:
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
@ -239,7 +243,7 @@ proc serviceLoop(self: SendService) {.async.} =
await self.checkStoredMessages()
self.evaluateAndCleanUp()
## TODO: add circuit breaker to avoid infinite looping in case of persistent failures
## Use OnlienStateChange observers to pause/resume the loop
## Use OnlineStateChange observers to pause/resume the loop
await sleepAsync(ServiceLoopInterval)
proc startSendService*(self: SendService) =

View File

@ -194,7 +194,7 @@ proc publish*(
let numPeers = (await node.wakuRelay.publish(pubsubTopic, message)).valueOr:
warn "waku.relay did not publish", error = error
# Todo: If NoPeersToPublish, we might want to return ok(0) instead!!!
return err($error)
return err("publish failed in relay: " & $error)
notice "waku.relay published",
peerId = node.peerId,

View File

@ -57,7 +57,6 @@ import
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
common/broker/broker_context,
waku_mix,
requests/node_requests,
common/broker/broker_context,
@ -470,7 +469,7 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
return ok()
proc startProvidersAndListeners*(node: WakuNode) =
proc startProvidersAndListeners(node: WakuNode) =
RequestRelayShard.setProvider(
node.brokerCtx,
proc(
@ -480,9 +479,9 @@ proc startProvidersAndListeners*(node: WakuNode) =
return err($error)
return ok(RequestRelayShard(relayShard: shard)),
).isOkOr:
error "Can't set proveder for RequestRelayShard", error = error
error "Can't set provider for RequestRelayShard", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
proc stopProvidersAndListeners(node: WakuNode) =
RequestRelayShard.clearProvider(node.brokerCtx)
proc start*(node: WakuNode) {.async.} =

View File

@ -1,20 +0,0 @@
type
EventEmitter* = object
# Placeholder for future event emitter implementation
observers*: seq[proc (data: EventData): void]
proc initEventEmitter*(): EventEmitter =
EventEmitter(observers: @[])
proc emitEvent*(emitter: var EventEmitter, data: EventData) =
for observer in emitter.observers:
asyncSpawn observer(data)
proc subscribeToEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) =
emitter.observers.add(observer)
proc unsubscribeFromEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) =
emitter.observers = emitter.observers.filterIt(it != observer)

View File

@ -10,7 +10,8 @@ import
bearssl/rand,
stew/byteutils
import
waku/[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context],
waku/
[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context],
./common,
./protocol_metrics,
./rpc_codec,
@ -150,7 +151,7 @@ proc unsubscribe*(
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
OnFilterUnSubscribeEvent.emit(pubSubTopic, contentTopicSeq)
OnFilterUnSubscribeEvent.emit(wfc.brokerCtx, pubsubTopic, contentTopicSeq)
return ok()

View File

@ -106,8 +106,6 @@ proc publish*(
let relayPeerCount =
when dest is Connection:
?await wl.sendPushRequest(request, dest.peerId, some(dest))
elif dest is RemotePeerInfo:
?await wl.sendPushRequest(request, dest)
else:
?await wl.sendPushRequest(request, dest)

View File

@ -459,7 +459,7 @@ proc mount(
return ok(RequestGenerateRlnProof(proof: proof))
).isOkOr:
return err("Proof generator provider cannot be set")
return err("Proof generator provider cannot be set: " & $error)
return ok(wakuRlnRelay)