Introduce api/send

Added events and requests for support.
Reworked delivery_monitor into a featured devlivery_service, that
- supports relay publish and lightpush depending on configuration but with fallback options
- if available and configured it utilizes store api to confirm message delivery
- emits message delivery events accordingly

Notice: There are parts still in WIP and needs review and follow ups.

prepare for use in api_example
This commit is contained in:
NagyZoltanPeter 2025-11-21 16:11:58 +01:00
parent ae74b9018a
commit 70c3afb4a7
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
54 changed files with 1152 additions and 515 deletions

View File

@ -147,7 +147,7 @@ NIM_PARAMS := $(NIM_PARAMS) -d:disable_libbacktrace
endif
# enable experimental exit is dest feature in libp2p mix
NIM_PARAMS := $(NIM_PARAMS) -d:libp2p_mix_experimental_exit_is_dest
NIM_PARAMS := $(NIM_PARAMS) -d:libp2p_mix_experimental_exit_is_dest
libbacktrace:
+ $(MAKE) -C vendor/nim-libbacktrace --no-print-directory BUILD_CXX_LIB=0
@ -266,6 +266,10 @@ lightpushwithmix: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim lightpushwithmix $(NIM_PARAMS) waku.nims
api_example: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims
build/%: | build deps librln
echo -e $(BUILD_MSG) "build/$*" && \
$(ENV_SCRIPT) nim buildone $(NIM_PARAMS) waku.nims $*

View File

@ -7,6 +7,22 @@ type CliArgs = object
defaultValue: "", desc: "ETH RPC Endpoint, if passed, RLN is enabled"
.}: string
proc periodicSender(w: Waku): Future[void] {.async.} =
## Periodically sends a Waku message every 30 seconds
var counter = 0
while true:
let envelope = MessageEnvelope.init(
contentTopic = "example/content/topic",
payload = "Hello Waku! Message number: " & $counter,
)
let sendRequestId = (await w.send(envelope)).valueOr:
echo "Failed to send message: ", error
quit(QuitFailure)
counter += 1
await sleepAsync(30.seconds)
when isMainModule:
let args = CliArgs.load()
@ -21,7 +37,7 @@ when isMainModule:
)
else:
# Connect to TWN, use ETH RPC Endpoint for RLN
NodeConfig.init(ethRpcEndpoints = @[args.ethRpcEndpoint])
NodeConfig.init(mode = WakuMode.Core, ethRpcEndpoints = @[args.ethRpcEndpoint])
# Create the node using the library API's createNode function
let node = (waitFor createNode(config)).valueOr:
@ -37,4 +53,6 @@ when isMainModule:
echo "Node started successfully!"
asyncSpawn periodicSender(node)
runForever()

View File

@ -8,7 +8,8 @@ import
waku/common/logging,
waku/factory/waku,
waku/node/peer_manager,
waku/waku_relay/[protocol, topic_health],
waku/waku_relay/protocol,
waku/node/health_monitor/topic_health,
waku/waku_core/[topics/pubsub_topic, message],
./waku_thread_requests/[waku_thread_request, requests/debug_node_request],
./ffi_types,

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52
Subproject commit 0309685cd27d4bf763c8b3be86a76c33bcfe67ea

View File

@ -1,10 +1,10 @@
## Main module for using nwaku as a Nimble library
##
##
## This module re-exports the public API for creating and managing Waku nodes
## when using nwaku as a library dependency.
import waku/api/[api, api_conf]
export api, api_conf
import waku/api/[api, api_conf, types]
export api, api_conf, types
import waku/factory/waku
export waku

View File

@ -177,6 +177,10 @@ task lightpushwithmix, "Build lightpushwithmix":
let name = "lightpush_publisher_mix"
buildBinary name, "examples/lightpush_mix/"
task api_example, "Build api_example":
let name = "api_example"
buildBinary name, "examples/api_example/"
task buildone, "Build custom target":
let filepath = paramStr(paramCount())
discard buildModule filepath

View File

@ -1,8 +1,12 @@
import chronicles, chronos, results
import waku/factory/waku
import waku/[requests/health_request, waku_core, waku_node]
import waku/node/delivery_service/send_service
import ./[api_conf, types], ./subscribe/subscribe
import ./api_conf
logScope:
topics = "api"
# TODO: Specs says it should return a `WakuNode`. As `send` and other APIs are defined, we can align.
proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} =
@ -15,3 +19,45 @@ proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} =
return err("Failed setting up Waku: " & $error)
return ok(wakuRes)
proc checkApiAvailability(w: Waku): Result[void, string] =
if w.isNil():
return err("Waku node is not initialized")
# check if health is satisfactory
# If Node is not healthy, return err("Waku node is not healthy")
let healthStatus = waitFor RequestNodeHealth.request()
if healthStatus.isErr():
warn "Failed to get Waku node health status: ", error = healthStatus.error
# Let's suppose the node is hesalthy enough, go ahead
else:
if healthStatus.get().healthStatus != NodeHealth.Unhealthy:
return err("Waku node is not healthy, has got no connections.")
return ok()
proc subscribe*(
w: Waku, contentTopic: ContentTopic
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let requestId = newRequestId(w.rng)
asyncSpawn w.subscribeImpl(requestId, contentTopic)
return ok(requestId)
proc send*(
w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let requestId = newRequestId(w.rng)
let deliveryTask = DeliveryTask.create(requestId, envelope).valueOr:
return err("Failed to create delivery task: " & error)
asyncSpawn w.deliveryService.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -131,8 +131,9 @@ proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] =
b.rateLimitConf.withRateLimits(@["filter:100/1s", "lightpush:5/1s", "px:5/1s"])
of Edge:
return err("Edge mode is not implemented")
# All client side protocols are mounted by default
# Peer exchange client is always enabled and start_node will start the px loop
discard
## Network Conf
let protocolsConfig = nodeConfig.protocolsConfig

13
waku/api/request_id.nim Normal file
View File

@ -0,0 +1,13 @@
{.push raises: [].}
import bearssl/rand
import waku/utils/requests as request_utils
import ./types
proc newRequestId*(rng: ref HmacDrbgContext): RequestId =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
{.pop.}

46
waku/api/send_api.md Normal file
View File

@ -0,0 +1,46 @@
# SEND API
**THIS IS TO BE REMOVED BEFORE PR MERGE**
This document collects logic and todo's around the Send API.
## Overview
Send api hides the complex logic of using raw protocols for reliable message delivery.
The delivery method is chosen based on the node configuration and actual availabilites of peers.
## Delivery task
Each message send request is boundled into a task that not just holds the composed message but also the state of the delivery.
## Delivery methods
Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes:
- P2PReliability validation - checking network store node wheather the message is reached at least a store node.
- Simple retry until message is propagated to the network
- Relay says >0 peers as publish result
- LightushClient returns with success
Depending on node config:
- Relay
- Lightpush
These methods are used in combination to achieve the best reliability.
Fallback mechanism is used to switch between methods if the current one fails.
Relay+StoreCheck -> Relay+simeple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
Combination is dynamicly chosen on node configuration. Levels can be skipped depending on actual connectivity.
Actual connectivity is checked:
- Relay's topic health check - at least dLow peers in the mesh for the topic
- Store nodes availability - at least one store service node is available in peer manager
- Lightpush client availability - at least one lightpush service node is available in peer manager
## Delivery processing
At every send request, each task is tried to be delivered right away.
Any further retries and store check is done as a background task in a loop with predefined intervals.
Each task is set for a maximum number of retries and/or maximum time to live.
In each round of store check and retry send tasks are selected based on their state.
The state is updated based on the result of the delivery method.

View File

@ -0,0 +1,12 @@
# import chronicles, chronos, results
import chronos
import waku/waku_core
import waku/api/types
import waku/factory/waku
proc subscribeImpl*(
w: Waku, requestId: RequestId, contentTopic: ContentTopic
): Future[void] {.async.} =
## Implementation of the subscribe API
## This is a placeholder implementation
await sleepAsync(1000) # Simulate async work

64
waku/api/types.nim Normal file
View File

@ -0,0 +1,64 @@
{.push raises: [].}
import bearssl/rand, std/times, chronos, chronicles
import stew/byteutils
import waku/utils/requests as request_utils
import waku/waku_core/[topics/content_topic, message/message, time]
import waku/requests/requests
logScope:
topics = "message envelope"
type
MessageEnvelope* = object
contentTopic*: ContentTopic
payload*: seq[byte]
ephemeral*: bool
RequestId* = distinct string
NodeHealth* {.pure.} = enum
Healthy
MinimallyHealthy
Unhealthy
proc newRequestId*(rng: ref HmacDrbgContext): RequestId =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,
payload: seq[byte] | string,
ephemeral: bool = false,
): MessageEnvelope =
when payload is seq[byte]:
MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral)
else:
MessageEnvelope(
contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral
)
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
## Convert a MessageEnvelope to a WakuMessage.
var wm = WakuMessage(
contentTopic: envelope.contentTopic,
payload: envelope.payload,
ephemeral: envelope.ephemeral,
timestamp: getNanosecondTime(getTime().toUnixFloat()),
)
# TODO: First find out if proof is needed at all
let requestedProof = (
waitFor RequestGenerateRlnProof.request(wm, getTime().toUnixFloat())
).valueOr:
warn "Failed to add RLN proof to WakuMessage: ", error = error
return wm
wm.proof = requestedProof.proof
return wm
{.pop.}

View File

@ -0,0 +1,27 @@
import waku/waku_core/[message/message, message/digest], waku/common/broker/event_broker
type DeliveryDirection* {.pure.} = enum
PUBLISHING
RECEIVING
type DeliverySuccess* {.pure.} = enum
SUCCESSFUL
UNSUCCESSFUL
EventBroker:
type DeliveryFeedbackEvent* = ref object
success*: DeliverySuccess
dir*: DeliveryDirection
comment*: string
msgHash*: WakuMessageHash
msg*: WakuMessage
EventBroker:
type OnFilterSubscribeEvent* = object
pubsubTopic*: string
contentTopics*: seq[string]
EventBroker:
type OnFilterUnSubscribeEvent* = object
pubsubTopic*: string
contentTopics*: seq[string]

3
waku/events/events.nim Normal file
View File

@ -0,0 +1,3 @@
import ./[message_events, delivery_events]
export message_events, delivery_events

View File

@ -0,0 +1,23 @@
import waku/common/broker/event_broker
import waku/api/types
export types
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string

View File

@ -27,7 +27,7 @@ import
../node/peer_manager,
../node/health_monitor,
../node/waku_metrics,
../node/delivery_monitor/delivery_monitor,
../node/delivery_service/delivery_service,
../rest_api/message_cache,
../rest_api/endpoint/server,
../rest_api/endpoint/builder as rest_server_builder,
@ -69,7 +69,7 @@ type Waku* = ref object
healthMonitor*: NodeHealthMonitor
deliveryMonitor: DeliveryMonitor
deliveryService*: DeliveryService
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
@ -200,16 +200,10 @@ proc new*(
return err("Failed setting up app callbacks: " & $error)
## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if wakuConf.p2pReliability:
if wakuConf.remoteStoreNode.isNone():
return err("A storenode should be set when reliability mode is on")
let deliveryMonitor = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
node.wakuFilterClient,
).valueOr:
return err("could not create delivery monitor: " & $error)
let deliveryService = DeliveryService.new(
wakuConf.p2pReliability, node,
).valueOr:
return err("could not create delivery service: " & $error)
var waku = Waku(
version: git_version,
@ -218,7 +212,7 @@ proc new*(
key: wakuConf.nodeKey,
node: node,
healthMonitor: healthMonitor,
deliveryMonitor: deliveryMonitor,
deliveryService: deliveryService,
appCallbacks: appCallbacks,
restServer: restServer,
)
@ -403,8 +397,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
return err("failed to start waku discovery v5: " & $error)
## Reliability
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()
if not waku[].deliveryService.isNil():
waku[].deliveryService.startDeliveryService()
## Health Monitor
waku[].healthMonitor.startHealthMonitor().isOkOr:
@ -463,3 +457,16 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.restServer.isNil():
await waku.restServer.stop()
proc isModeCoreAvailable*(waku: Waku): bool =
return not waku.node.wakuRelay.isNil()
proc isModeEdgeAvailable*(waku: Waku): bool =
return
waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and
not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil()
proc isP2PReliabilityEnabled*(waku: Waku): bool =
return not waku.deliveryService.isNil()
{.pop.}

View File

@ -1,17 +0,0 @@
import ../../waku_core
type DeliveryDirection* {.pure.} = enum
PUBLISHING
RECEIVING
type DeliverySuccess* {.pure.} = enum
SUCCESSFUL
UNSUCCESSFUL
type DeliveryFeedbackCallback* = proc(
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].}

View File

@ -1,43 +0,0 @@
## This module helps to ensure the correct transmission and reception of messages
import results
import chronos
import
./recv_monitor,
./send_monitor,
./delivery_callback,
../../waku_core,
../../waku_store/client,
../../waku_relay/protocol,
../../waku_lightpush/client,
../../waku_filter_v2/client
type DeliveryMonitor* = ref object
sendMonitor: SendMonitor
recvMonitor: RecvMonitor
proc new*(
T: type DeliveryMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightpushClient,
wakuFilterClient: WakuFilterClient,
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryMonitor
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish
let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient)
let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient)
return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor))
proc startDeliveryMonitor*(self: DeliveryMonitor) =
self.sendMonitor.startSendMonitor()
self.recvMonitor.startRecvMonitor()
proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} =
self.sendMonitor.stopSendMonitor()
await self.recvMonitor.stopRecvMonitor()
proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) =
## The deliveryCb is a proc defined by the api client so that it can get delivery feedback
self.sendMonitor.setDeliveryCallback(deliveryCb)
self.recvMonitor.setDeliveryCallback(deliveryCb)

View File

@ -1,9 +0,0 @@
import chronicles
import ../../waku_core/message/message
type PublishObserver* = ref object of RootObj
method onMessagePublished*(
self: PublishObserver, pubsubTopic: string, message: WakuMessage
) {.base, gcsafe, raises: [].} =
error "onMessagePublished not implemented"

View File

@ -1,212 +0,0 @@
## This module reinforces the publish operation with regular store-v3 requests.
##
import std/[sequtils, tables]
import chronos, chronicles, libp2p/utility
import
./delivery_callback,
./publish_observer,
../../waku_core,
./not_delivered_storage/not_delivered_storage,
../../waku_store/[client, common],
../../waku_archive/archive,
../../waku_relay/protocol,
../../waku_lightpush/client
const MaxTimeInCache* = chronos.minutes(1)
## Messages older than this time will get completely forgotten on publication and a
## feedback will be given when that happens
const SendCheckInterval* = chronos.seconds(3)
## Interval at which we check that messages have been properly received by a store node
const MaxMessagesToCheckAtOnce = 100
## Max number of messages to check if they were properly archived by a store node
const ArchiveTime = chronos.seconds(3)
## Estimation of the time we wait until we start confirming that a message has been properly
## received and archived by a store node
type DeliveryInfo = object
pubsubTopic: string
msg: WakuMessage
type SendMonitor* = ref object of PublishObserver
publishedMessages: Table[WakuMessageHash, DeliveryInfo]
## Cache that contains the delivery info per message hash.
## This is needed to make sure the published messages are properly published
msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task
notDeliveredStorage: NotDeliveredStorage
## NOTE: this is not fully used because that might be tackled by higher abstraction layers
storeClient: WakuStoreClient
deliveryCb: DeliveryFeedbackCallback
wakuRelay: protocol.WakuRelay
wakuLightpushClient: WakuLightPushClient
proc new*(
T: type SendMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
): Result[T, string] =
if wakuRelay.isNil() and wakuLightpushClient.isNil():
return err(
"Could not create SendMonitor. wakuRelay or wakuLightpushClient should be set"
)
let notDeliveredStorage = ?NotDeliveredStorage.new()
let sendMonitor = SendMonitor(
notDeliveredStorage: notDeliveredStorage,
storeClient: storeClient,
wakuRelay: wakuRelay,
wakuLightpushClient: wakuLightPushClient,
)
if not wakuRelay.isNil():
wakuRelay.addPublishObserver(sendMonitor)
if not wakuLightpushClient.isNil():
wakuLightpushClient.addPublishObserver(sendMonitor)
return ok(sendMonitor)
proc performFeedbackAndCleanup(
self: SendMonitor,
msgsToDiscard: Table[WakuMessageHash, DeliveryInfo],
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
) =
## This procs allows to bring delivery feedback to the API client
## It requires a 'deliveryCb' to be registered beforehand.
if self.deliveryCb.isNil():
error "deliveryCb is nil in performFeedbackAndCleanup",
success, dir, comment, hashes = toSeq(msgsToDiscard.keys).mapIt(shortLog(it))
return
for hash, deliveryInfo in msgsToDiscard:
info "send monitor performFeedbackAndCleanup",
success, dir, comment, msg_hash = shortLog(hash)
self.deliveryCb(success, dir, comment, hash, deliveryInfo.msg)
self.publishedMessages.del(hash)
proc checkMsgsInStore(
self: SendMonitor, msgsToValidate: Table[WakuMessageHash, DeliveryInfo]
): Future[
Result[
tuple[
publishedCorrectly: Table[WakuMessageHash, DeliveryInfo],
notYetPublished: Table[WakuMessageHash, DeliveryInfo],
],
void,
]
] {.async.} =
let hashesToValidate = toSeq(msgsToValidate.keys)
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(includeData: false, messageHashes: hashesToValidate)
)
).valueOr:
error "checkMsgsInStore failed to get remote msgHashes",
hashes = hashesToValidate.mapIt(shortLog(it)), error = $error
return err()
let publishedHashes = storeResp.messages.mapIt(it.messageHash)
var notYetPublished: Table[WakuMessageHash, DeliveryInfo]
var publishedCorrectly: Table[WakuMessageHash, DeliveryInfo]
for msgHash, deliveryInfo in msgsToValidate.pairs:
if publishedHashes.contains(msgHash):
publishedCorrectly[msgHash] = deliveryInfo
self.publishedMessages.del(msgHash) ## we will no longer track that message
else:
notYetPublished[msgHash] = deliveryInfo
return ok((publishedCorrectly: publishedCorrectly, notYetPublished: notYetPublished))
proc processMessages(self: SendMonitor) {.async.} =
var msgsToValidate: Table[WakuMessageHash, DeliveryInfo]
var msgsToDiscard: Table[WakuMessageHash, DeliveryInfo]
let now = getNowInNanosecondTime()
let timeToCheckThreshold = now - ArchiveTime.nanos
let maxLifeTime = now - MaxTimeInCache.nanos
for hash, deliveryInfo in self.publishedMessages.pairs:
if deliveryInfo.msg.timestamp < maxLifeTime:
## message is too old
msgsToDiscard[hash] = deliveryInfo
if deliveryInfo.msg.timestamp < timeToCheckThreshold:
msgsToValidate[hash] = deliveryInfo
## Discard the messages that are too old
self.performFeedbackAndCleanup(
msgsToDiscard, DeliverySuccess.UNSUCCESSFUL, DeliveryDirection.PUBLISHING,
"Could not publish messages. Please try again.",
)
let (publishedCorrectly, notYetPublished) = (
await self.checkMsgsInStore(msgsToValidate)
).valueOr:
return ## the error log is printed in checkMsgsInStore
## Give positive feedback for the correctly published messages
self.performFeedbackAndCleanup(
publishedCorrectly, DeliverySuccess.SUCCESSFUL, DeliveryDirection.PUBLISHING,
"messages published correctly",
)
## Try to publish again
for msgHash, deliveryInfo in notYetPublished.pairs:
let pubsubTopic = deliveryInfo.pubsubTopic
let msg = deliveryInfo.msg
if not self.wakuRelay.isNil():
info "trying to publish again with wakuRelay", msgHash, pubsubTopic
(await self.wakuRelay.publish(pubsubTopic, msg)).isOkOr:
error "could not publish with wakuRelay.publish",
msgHash, pubsubTopic, error = $error
continue
if not self.wakuLightpushClient.isNil():
info "trying to publish again with wakuLightpushClient", msgHash, pubsubTopic
(await self.wakuLightpushClient.publishToAny(pubsubTopic, msg)).isOkOr:
error "could not publish with publishToAny", error = $error
continue
proc checkIfMessagesStored(self: SendMonitor) {.async.} =
## Continuously monitors that the sent messages have been received by a store node
while true:
await self.processMessages()
await sleepAsync(SendCheckInterval)
method onMessagePublished(
self: SendMonitor, pubsubTopic: string, msg: WakuMessage
) {.gcsafe, raises: [].} =
## Implementation of the PublishObserver interface.
##
## When publishing a message either through relay or lightpush, we want to add some extra effort
## to make sure it is received to one store node. Hence, keep track of those published messages.
info "onMessagePublished"
let msgHash = computeMessageHash(pubSubTopic, msg)
if not self.publishedMessages.hasKey(msgHash):
self.publishedMessages[msgHash] = DeliveryInfo(pubsubTopic: pubsubTopic, msg: msg)
proc startSendMonitor*(self: SendMonitor) =
self.msgStoredCheckerHandle = self.checkIfMessagesStored()
proc stopSendMonitor*(self: SendMonitor) =
discard self.msgStoredCheckerHandle.cancelAndWait()
proc setDeliveryCallback*(self: SendMonitor, deliveryCb: DeliveryFeedbackCallback) =
self.deliveryCb = deliveryCb

View File

@ -1,13 +0,0 @@
import chronicles
type SubscriptionObserver* = ref object of RootObj
method onSubscribe*(
self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string]
) {.base, gcsafe, raises: [].} =
error "onSubscribe not implemented"
method onUnsubscribe*(
self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string]
) {.base, gcsafe, raises: [].} =
error "onUnsubscribe not implemented"

View File

@ -0,0 +1,36 @@
## This module helps to ensure the correct transmission and reception of messages
import results
import chronos
import
./recv_service,
./send_service,
waku/[
waku_core,
waku_node,
waku_store/client,
waku_relay/protocol,
waku_lightpush/client,
waku_filter_v2/client,
]
type DeliveryService* = ref object
sendService*: SendService
recvService: RecvService
proc new*(
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryService
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
let sendService = ?SendService.new(useP2PReliability, w)
let recvService = RecvService.new(w)
return ok(DeliveryService(sendService: sendService, recvService: recvService))
proc startDeliveryService*(self: DeliveryService) =
self.sendService.startSendService()
self.recvService.startRecvService()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
self.sendService.stopSendService()
await self.recvService.stopRecvService()

View File

@ -4,7 +4,7 @@ import std/[tables, strutils, os], results, chronicles
import ../../../common/databases/db_sqlite, ../../../common/databases/common
logScope:
topics = "waku node delivery_monitor"
topics = "waku node delivery_service"
const TargetSchemaVersion* = 1
# increase this when there is an update in the database schema

View File

@ -1,17 +1,17 @@
## This module is aimed to keep track of the sent/published messages that are considered
## not being properly delivered.
##
##
## The archiving of such messages will happen in a local sqlite database.
##
##
## In the very first approach, we consider that a message is sent properly is it has been
## received by any store node.
##
##
import results
import
../../../common/databases/db_sqlite,
../../../waku_core/message/message,
../../../node/delivery_monitor/not_delivered_storage/migrations
../../../node/delivery_service/not_delivered_storage/migrations
const NotDeliveredMessagesDbUrl = "not-delivered-messages.db"

View File

@ -0,0 +1,3 @@
import ./recv_service/recv_service
export recv_service

View File

@ -5,12 +5,15 @@
import std/[tables, sequtils, options]
import chronos, chronicles, libp2p/utility
import
../../waku_core,
./delivery_callback,
./subscriptions_observer,
../../waku_store/[client, common],
../../waku_filter_v2/client,
../../waku_core/topics
waku/[
waku_core,
waku_store/client,
waku_store/common,
waku_filter_v2/client,
waku_core/topics,
events/delivery_events,
waku_node,
]
const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries
@ -28,14 +31,14 @@ type RecvMessage = object
rxTime: Timestamp
## timestamp of the rx message. We will not keep the rx messages forever
type RecvMonitor* = ref object of SubscriptionObserver
type RecvService* = ref object of RootObj
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics
storeClient: WakuStoreClient
deliveryCb: DeliveryFeedbackCallback
node: WakuNode
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnsubscribeEventListener
recentReceivedMsgs: seq[RecvMessage]
@ -46,10 +49,10 @@ type RecvMonitor* = ref object of SubscriptionObserver
endTimeToCheck: Timestamp
proc getMissingMsgsFromStore(
self: RecvMonitor, msgHashes: seq[WakuMessageHash]
self: RecvService, msgHashes: seq[WakuMessageHash]
): Future[Result[seq[TupleHashAndMsg], string]] {.async.} =
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(includeData: true, messageHashes: msgHashes)
)
).valueOr:
@ -62,25 +65,21 @@ proc getMissingMsgsFromStore(
)
proc performDeliveryFeedback(
self: RecvMonitor,
self: RecvService,
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].} =
## This procs allows to bring delivery feedback to the API client
## It requires a 'deliveryCb' to be registered beforehand.
if self.deliveryCb.isNil():
error "deliveryCb is nil in performDeliveryFeedback",
success, dir, comment, msg_hash
return
info "recv monitor performDeliveryFeedback",
success, dir, comment, msg_hash = shortLog(msgHash)
self.deliveryCb(success, dir, comment, msgHash, msg)
proc msgChecker(self: RecvMonitor) {.async.} =
DeliveryFeedbackEvent.emit(
success = success, dir = dir, comment = comment, msgHash = msgHash, msg = msg
)
proc msgChecker(self: RecvService) {.async.} =
## Continuously checks if a message has been received
while true:
await sleepAsync(StoreCheckPeriod)
@ -90,7 +89,7 @@ proc msgChecker(self: RecvMonitor) {.async.} =
var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
@ -126,8 +125,8 @@ proc msgChecker(self: RecvMonitor) {.async.} =
## update next check times
self.startTimeToCheck = self.endTimeToCheck
method onSubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
proc onSubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
@ -135,8 +134,8 @@ method onSubscribe(
do:
self.topicsInterest[pubsubTopic] = contentTopics
method onUnsubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
proc onUnsubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onUnsubscribe", pubsubTopic, contentTopics
@ -150,19 +149,13 @@ method onUnsubscribe(
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
proc new*(
T: type RecvMonitor,
storeClient: WakuStoreClient,
wakuFilterClient: WakuFilterClient,
): T =
proc new*(T: type RecvService, node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now)
if not wakuFilterClient.isNil():
wakuFilterClient.addSubscrObserver(recvMonitor)
var recvService = RecvService(node: node, startTimeToCheck: now)
if not node.wakuFilterClient.isNil():
let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
@ -170,27 +163,40 @@ proc new*(
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvMonitor.recentReceivedMsgs.add(rxMsg)
recvService.recentReceivedMsgs.add(rxMsg)
wakuFilterClient.registerPushHandler(filterPushHandler)
node.wakuFilterClient.registerPushHandler(filterPushHandler)
return recvMonitor
return recvService
proc loopPruneOldMessages(self: RecvMonitor) {.async.} =
proc loopPruneOldMessages(self: RecvService) {.async.} =
while true:
let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos
self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime)
await sleepAsync(PruneOldMsgsPeriod)
proc startRecvMonitor*(self: RecvMonitor) =
proc startRecvService*(self: RecvService) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
proc stopRecvMonitor*(self: RecvMonitor) {.async.} =
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
proc(subsEv: OnFilterSubscribeEvent): Future[void] {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics)
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
quit(QuitFailure)
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
proc(subsEv: OnFilterUnsubscribeEvent): Future[void] {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics)
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(self.onSubscribeListener)
OnFilterUnSubscribeEvent.dropListener(self.onUnsubscribeListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
if not self.msgPrunerHandler.isNil():
await self.msgPrunerHandler.cancelAndWait()
proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) =
self.deliveryCb = deliveryCb

View File

@ -0,0 +1,6 @@
## This module reinforces the publish operation with regular store-v3 requests.
##
import ./send_service/[send_service, delivery_task]
export send_service, delivery_task

View File

@ -0,0 +1,66 @@
import std/[options, times], chronos
import waku/waku_core, waku/api/types, waku/requests/node_requests
type DeliveryState* {.pure.} = enum
Entry
SuccessfullyPropagated
SuccessfullyValidated
FallbackRetry
NextRoundRetry
FailedToDeliver
type DeliveryTask* = ref object
requestId*: RequestId
pubsubTopic*: PubsubTopic
msg*: WakuMessage
msgHash*: WakuMessageHash
tryCount*: int
state*: DeliveryState
deliveryTime*: Moment
errorDesc*: string
proc create*(
T: type DeliveryTask, requestId: RequestId, envelop: MessageEnvelope
): Result[T, string] =
let msg = envelop.toWakuMessage()
# TODO: use sync request for such as soon as available
let relayShardRes = (
waitFor RequestRelayShard.request(none[PubsubTopic](), envelop.contentTopic)
).valueOr:
return err($error)
let pubsubTopic = relayShardRes.relayShard.toPubsubTopic()
let msgHash = computeMessageHash(pubsubTopic, msg)
return ok(
T(
requestId: requestId,
pubsubTopic: pubsubTopic,
msg: msg,
msgHash: msgHash,
tryCount: 0,
state: DeliveryState.Entry,
)
)
func `==`*(r, l: DeliveryTask): bool =
if r.isNil() == l.isNil():
r.isNil() or r.msgHash == l.msgHash
else:
false
proc messageAge*(self: DeliveryTask): timer.Duration =
let actual = getNanosecondTime(getTime().toUnixFloat())
if self.msg.timestamp >= 0 and self.msg.timestamp < actual:
nanoseconds(actual - self.msg.timestamp)
else:
ZeroDuration
proc deliveryAge*(self: DeliveryTask): timer.Duration =
if self.state == DeliveryState.SuccessfullyPropagated:
timer.Moment.now() - self.deliveryTime
else:
ZeroDuration
proc isEphemeral*(self: DeliveryTask): bool =
return self.msg.ephemeral

View File

@ -0,0 +1,74 @@
import chronicles, chronos, results
import std/options
import
waku/waku_node,
waku/waku_core,
waku/node/peer_manager,
waku/waku_lightpush/[callbacks, common, client, rpc]
import ./[delivery_task, send_processor]
logScope:
topics = "send service lightpush processor"
type LightpushSendProcessor* = ref object of BaseSendProcessor
peerManager: PeerManager
lightpushClient: WakuLightPushClient
proc new*(
T: type LightpushSendProcessor,
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
): T =
return T(peerManager: peerManager, lightpushClient: lightpushClient)
proc isLightpushPeerAvailable(
self: LightpushSendProcessor, pubsubTopic: PubsubTopic
): bool =
return self.peerManager.selectPeer(WakuLightPushCodec, some(pubsubTopic)).isSome()
method isValidProcessor*(
self: LightpushSendProcessor, task: DeliveryTask
): Future[bool] {.async.} =
return self.isLightpushPeerAvailable(task.pubsubTopic)
method sendImpl*(
self: LightpushSendProcessor, task: DeliveryTask
): Future[void] {.async.} =
task.tryCount.inc()
info "Trying message delivery via Lightpush",
requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount
let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr:
task.state = DeliveryState.NextRoundRetry
return
let pushResult =
await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer)
if pushResult.isErr:
error "LightpushSendProcessor sendImpl failed",
error = pushResult.error.desc.get($pushResult.error.code)
case pushResult.error.code
of LightPushErrorCode.NO_PEERS_TO_RELAY, LightPushErrorCode.TOO_MANY_REQUESTS,
LightPushErrorCode.OUT_OF_RLN_PROOF, LightPushErrorCode.SERVICE_NOT_AVAILABLE,
LightPushErrorCode.INTERNAL_SERVER_ERROR:
task.state = DeliveryState.NextRoundRetry
else:
# the message is malformed, send error
task.state = DeliveryState.FailedToDeliver
task.errorDesc = pushResult.error.desc.get($pushResult.error.code)
task.deliveryTime = Moment.now()
return
if pushResult.isOk and pushResult.get() > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
# TODO: with a simple retry processor it might be more accurate to say `Sent`
else:
# Controversial state, publish says ok but no peer. It should not happen.
task.state = DeliveryState.NextRoundRetry
return

View File

@ -0,0 +1,67 @@
import chronos, chronicles
import std/options
import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc]
import waku/requests/health_request
import waku/api/types
import ./[delivery_task, send_processor]
logScope:
topics = "send service relay processor"
type RelaySendProcessor* = ref object of BaseSendProcessor
publishProc: PushMessageHandler
fallbackStateToSet: DeliveryState
proc new*(
T: type RelaySendProcessor,
lightpushAvailable: bool,
publishProc: PushMessageHandler,
): RelaySendProcessor =
let fallbackStateToSet =
if lightpushAvailable:
DeliveryState.FallbackRetry
else:
DeliveryState.FailedToDeliver
return
RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet)
proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} =
let healthReport = (await RequestRelayTopicsHealth.request(@[topic])).valueOr:
return false
if healthReport.topicHealth.len() < 1:
return false
let health = healthReport.topicHealth[0].health
return health == MINIMALLY_HEALTHY or health == SUFFICIENTLY_HEALTHY
method isValidProcessor*(
self: RelaySendProcessor, task: DeliveryTask
): Future[bool] {.async.} =
return await isTopicHealthy(task.pubsubTopic)
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
task.tryCount.inc()
info "Trying message delivery via Relay",
requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount
let pushResult = await self.publishProc(task.pubsubTopic, task.msg)
if pushResult.isErr():
let errorMessage = pushResult.error.desc.get($pushResult.error.code)
error "Failed to publish message with relay",
request = task.requestId, msgHash = task.msgHash, error = errorMessage
if pushResult.error.code != LightPushErrorCode.NO_PEERS_TO_RELAY:
task.state = DeliveryState.FailedToDeliver
task.errorDesc = errorMessage
else:
task.state = self.fallbackStateToSet
return
if pushResult.isOk and pushResult.get() > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash
task.state = DeliveryState.SuccessfullyPropagated
task.deliveryTime = Moment.now()
else:
# It shall not happen, but still covering it
task.state = self.fallbackStateToSet

View File

@ -0,0 +1,32 @@
import chronos
import ./delivery_task
type BaseSendProcessor* = ref object of RootObj
fallbackProcessor*: BaseSendProcessor
proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) =
self.fallbackProcessor = next
method isValidProcessor*(
self: BaseSendProcessor, task: DeliveryTask
): Future[bool] {.async, base.} =
return false
method sendImpl*(
self: BaseSendProcessor, task: DeliveryTask
): Future[void] {.async, base.} =
assert false, "Not implemented"
method process*(
self: BaseSendProcessor, task: DeliveryTask
): Future[void] {.async, base.} =
var currentProcessor: BaseSendProcessor = self
var keepTrying = true
while not currentProcessor.isNil() and keepTrying:
if await currentProcessor.isValidProcessor(task):
await currentProcessor.sendImpl(task)
currentProcessor = currentProcessor.fallbackProcessor
keepTrying = task.state == DeliveryState.FallbackRetry
if task.state == DeliveryState.FallbackRetry:
task.state = DeliveryState.NextRoundRetry

View File

@ -0,0 +1,238 @@
## This module reinforces the publish operation with regular store-v3 requests.
##
import std/[sequtils, tables, options]
import chronos, chronicles, libp2p/utility
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
waku/[
waku_core,
node/waku_node,
node/peer_manager,
waku_store/client,
waku_store/common,
waku_archive/archive,
waku_relay/protocol,
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
events/delivery_events,
events/message_events,
]
logScope:
topics = "send service"
# This useful util is missing from sequtils, this extends applyIt with predicate...
template applyItIf*(varSeq, pred, op: untyped) =
for i in low(varSeq) .. high(varSeq):
let it {.inject.} = varSeq[i]
if pred:
op
varSeq[i] = it
template forEach*(varSeq, op: untyped) =
for i in low(varSeq) .. high(varSeq):
let it {.inject.} = varSeq[i]
op
const MaxTimeInCache* = chronos.minutes(1)
## Messages older than this time will get completely forgotten on publication and a
## feedback will be given when that happens
const ServiceLoopInterval* = chronos.seconds(1)
## Interval at which we check that messages have been properly received by a store node
const ArchiveTime = chronos.seconds(3)
## Estimation of the time we wait until we start confirming that a message has been properly
## received and archived by a store node
type SendService* = ref object of RootObj
taskCache: seq[DeliveryTask]
## Cache that contains the delivery task per message hash.
## This is needed to make sure the published messages are properly published
serviceLoopHandle: Future[void] ## handle that allows to stop the async task
sendProcessor: BaseSendProcessor
node: WakuNode
checkStoreForMessages: bool
proc setupSendProcessorChain(
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
relay: WakuRelay,
rlnRelay: WakuRLNRelay,
): Result[BaseSendProcessor, string] =
let isRelayAvail = not relay.isNil()
let isLightPushAvail = not lightpushClient.isNil()
if not isRelayAvail and not isLightPushAvail:
return err("No valid send processor found for the delivery task")
var processors = newSeq[BaseSendProcessor]()
if isRelayAvail:
let rln: Option[WakuRLNRelay] =
if rlnRelay.isNil():
none[WakuRLNRelay]()
else:
some(rlnRelay)
let publishProc = getRelayPushHandler(relay, rln)
processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc))
if isLightPushAvail:
processors.add(LightpushSendProcessor.new(peerManager, lightpushClient))
var currentProcessor: BaseSendProcessor = processors[0]
for i in 1 ..< processors.len():
currentProcessor.chain(processors[i])
currentProcessor = processors[i]
return ok(processors[0])
proc new*(
T: type SendService, preferP2PReliability: bool, w: WakuNode
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(
"Could not create SendService. wakuRelay or wakuLightpushClient should be set"
)
let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil()
let sendProcessorChain = setupSendProcessorChain(
w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay
).valueOr:
return err(error)
let sendService = SendService(
taskCache: newSeq[DeliveryTask](),
serviceLoopHandle: nil,
sendProcessor: sendProcessorChain,
node: w,
checkStoreForMessages: checkStoreForMessages,
)
return ok(sendService)
proc addTask(self: SendService, task: DeliveryTask) =
self.taskCache.addUnique(task)
proc isStorePeerAvailable*(sendService: SendService): bool =
return sendService.node.peerManager.selectPeer(WakuStoreCodec).isSome()
proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.async.} =
if tasksToValidate.len() == 0:
return
if not isStorePeerAvailable(self):
warn "Skipping store validation for ",
messageCount = tasksToValidate.len(), error = "no store peer available"
return
var hashesToValidate = tasksToValidate.mapIt(it.msgHash)
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(includeData: false, messageHashes: hashesToValidate)
)
).valueOr:
error "Failed to get store validation for messages",
hashes = hashesToValidate.mapIt(shortLog(it)), error = $error
return
let storedItems = storeResp.messages.mapIt(it.messageHash)
# Set success state for messages found in store
self.taskCache.applyItIf(storedItems.contains(it.msgHash)):
it.state = DeliveryState.SuccessfullyValidated
# set retry state for messages not found in store
hashesToValidate.keepItIf(not storedItems.contains(it))
self.taskCache.applyItIf(hashesToValidate.contains(it.msgHash)):
it.state = DeliveryState.NextRoundRetry
proc checkStoredMessages(self: SendService) {.async.} =
if not self.checkStoreForMessages:
return
let tasksToValidate = self.taskCache.filterIt(
it.state == DeliveryState.SuccessfullyPropagated and it.deliveryAge() > ArchiveTime and
not it.isEphemeral()
)
await self.checkMsgsInStore(tasksToValidate)
proc reportTaskResult(self: SendService, task: DeliveryTask) =
case task.state
of DeliveryState.SuccessfullyPropagated:
# TODO: in case of of unable to strore check messages shall we report success instead?
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash
MessagePropagatedEvent.emit(task.requestId, task.msgHash.toString())
return
of DeliveryState.SuccessfullyValidated:
info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash
MessageSentEvent.emit(task.requestId, task.msgHash.toString())
return
of DeliveryState.FailedToDeliver:
error "Failed to send message",
requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc
MessageErrorEvent.emit(task.requestId, task.msgHash.toString(), task.errorDesc)
return
else:
# rest of the states are intermediate and does not translate to event
discard
if task.messageAge() > MaxTimeInCache:
error "Failed to send message",
requestId = task.requestId, msgHash = task.msgHash, error = "Message too old"
task.state = DeliveryState.FailedToDeliver
MessageErrorEvent.emit(
task.requestId, task.msgHash.toString(), "Unable to send within retry time window"
)
proc evaluateAndCleanUp(self: SendService) =
self.taskCache.forEach(self.reportTaskResult(it))
self.taskCache.keepItIf(
it.state != DeliveryState.SuccessfullyValidated or
it.state != DeliveryState.FailedToDeliver
)
# remove propagated ephemeral messages as no store check is possible
self.taskCache.keepItIf(
not (it.isEphemeral() and it.state == DeliveryState.SuccessfullyPropagated)
)
proc trySendMessages(self: SendService) {.async.} =
let tasksToSend = self.taskCache.filterIt(it.state == DeliveryState.NextRoundRetry)
for task in tasksToSend:
# Todo, check if it has any perf gain to run them concurrent...
await self.sendProcessor.process(task)
proc serviceLoop(self: SendService) {.async.} =
## Continuously monitors that the sent messages have been received by a store node
while true:
await self.trySendMessages()
await self.checkStoredMessages()
self.evaluateAndCleanUp()
## TODO: add circuit breaker to avoid infinite looping in case of persistent failures
## Use OnlienStateChange observers to pause/resume the loop
await sleepAsync(ServiceLoopInterval)
proc startSendService*(self: SendService) =
self.serviceLoopHandle = self.serviceLoop()
proc stopSendService*(self: SendService) =
if not self.serviceLoopHandle.isNil():
discard self.serviceLoopHandle.cancelAndWait()
proc send*(self: SendService, task: DeliveryTask): Future[void] {.async.} =
assert(not task.isNil(), "task for send must not be nil")
await self.sendProcessor.process(task)
reportTaskResult(self, task)
if task.state != DeliveryState.FailedToDeliver:
self.addTask(task)

View File

@ -1,11 +1,12 @@
import chronos
import ../waku_core
import waku/waku_core
type TopicHealth* = enum
UNHEALTHY
MINIMALLY_HEALTHY
SUFFICIENTLY_HEALTHY
NOT_SUBSCRIBED
proc `$`*(t: TopicHealth): string =
result =
@ -13,6 +14,7 @@ proc `$`*(t: TopicHealth): string =
of UNHEALTHY: "UnHealthy"
of MINIMALLY_HEALTHY: "MinimallyHealthy"
of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy"
of NOT_SUBSCRIBED: "NotSubscribed"
type TopicHealthChangeHandler* = proc(
pubsubTopic: PubsubTopic, topicHealth: TopicHealth

View File

@ -188,7 +188,6 @@ proc lightpushPublishHandler(
mixify: bool = false,
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
@ -196,21 +195,23 @@ proc lightpushPublishHandler(
target_peer_id = peer.peerId,
msg_hash = msgHash,
mixify = mixify
if mixify: #indicates we want to use mix to send the message
#TODO: How to handle multiple addresses?
let conn = node.wakuMix.toConnection(
MixDestination.exitNode(peer.peerId),
WakuLightPushCodec,
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))),
# indicating we only want a single path to be used for reply hence numSurbs = 1
).valueOr:
error "could not create mix connection"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
"Waku lightpush with mix not available",
)
if defined(libp2p_mix_experimental_exit_is_dest) and mixify:
#indicates we want to use mix to send the message
when defined(libp2p_mix_experimental_exit_is_dest):
#TODO: How to handle multiple addresses?
let conn = node.wakuMix.toConnection(
MixDestination.exitNode(peer.peerId),
WakuLightPushCodec,
MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))),
# indicating we only want a single path to be used for reply hence numSurbs = 1
).valueOr:
error "could not create mix connection"
return lighpushErrorResult(
LightPushErrorCode.SERVICE_NOT_AVAILABLE,
"Waku lightpush with mix not available",
)
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn)
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn)
else:
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
@ -259,7 +260,7 @@ proc lightpushPublish*(
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
)
let pubsubForPublish = pubSubTopic.valueOr:
let pubsubForPublish = pubsubTopic.valueOr:
if node.wakuAutoSharding.isNone():
let msg = "Pubsub topic must be specified when static sharding is enabled"
error "lightpush publish error", error = msg

View File

@ -165,7 +165,7 @@ proc unsubscribe*(
proc publish*(
node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage
): Future[Result[void, string]] {.async, gcsafe.} =
): Future[Result[int, string]] {.async, gcsafe.} =
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
## It is also used to determine the shard.
@ -184,16 +184,20 @@ proc publish*(
let msg = "Autosharding error: " & error
return err(msg)
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)
let numPeers = (await node.wakuRelay.publish(pubsubTopic, message)).valueOr:
warn "waku.relay did not publish", error = error
# Todo: If NoPeersToPublish, we might want to return ok(0) instead!!!
return err($error)
notice "waku.relay published",
peerId = node.peerId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
publishTime = getNowInNanosecondTime()
publishTime = getNowInNanosecondTime(),
numPeers = numPeers
return ok()
# TODO: investigate if we can return error in case numPeers is 0
ok(numPeers)
proc mountRelay*(
node: WakuNode,

View File

@ -27,38 +27,41 @@ import
libp2p/protocols/mix/mix_protocol
import
../waku_core,
../waku_core/topics/sharding,
../waku_relay,
../waku_archive,
../waku_archive_legacy,
../waku_store_legacy/protocol as legacy_store,
../waku_store_legacy/client as legacy_store_client,
../waku_store_legacy/common as legacy_store_common,
../waku_store/protocol as store,
../waku_store/client as store_client,
../waku_store/common as store_common,
../waku_store/resume,
../waku_store_sync,
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_metadata,
../waku_rendezvous/protocol,
../waku_rendezvous/client as rendezvous_client,
../waku_rendezvous/waku_peer_record,
../waku_lightpush_legacy/client as legacy_ligntpuhs_client,
../waku_lightpush_legacy as legacy_lightpush_protocol,
../waku_lightpush/client as ligntpuhs_client,
../waku_lightpush as lightpush_protocol,
../waku_enr,
../waku_peer_exchange,
../waku_rln_relay,
waku/[
waku_core,
waku_core/topics/sharding,
waku_relay,
waku_archive,
waku_archive_legacy,
waku_store_legacy/protocol as legacy_store,
waku_store_legacy/client as legacy_store_client,
waku_store_legacy/common as legacy_store_common,
waku_store/protocol as store,
waku_store/client as store_client,
waku_store/common as store_common,
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
waku_rendezvous/client as rendezvous_client,
waku_rendezvous/waku_peer_record,
waku_lightpush_legacy/client as legacy_ligntpuhs_client,
waku_lightpush_legacy as legacy_lightpush_protocol,
waku_lightpush/client as ligntpuhs_client,
waku_lightpush as lightpush_protocol,
waku_enr,
waku_peer_exchange,
waku_rln_relay,
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
waku_mix,
requests/node_requests,
],
./net_config,
./peer_manager,
../common/rate_limit/setting,
../common/callbacks,
../common/nimchronos,
../waku_mix
./peer_manager
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
@ -131,6 +134,18 @@ type
rateLimitSettings*: ProtocolRateLimitSettings
wakuMix*: WakuMix
proc deduceRelayShard(
node: WakuNode,
contentTopic: ContentTopic,
pubsubTopicOp: Option[PubsubTopic] = none[PubsubTopic](),
): Result[RelayShard, string] =
let pubsubTopic = pubsubTopicOp.valueOr:
if node.wakuAutoSharding.isNone():
return err("Pubsub topic must be specified when static sharding is enabled.")
node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
let msg = "Deducing shard failed: " & error
return err(msg)
proc getShardsGetter(node: WakuNode): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
# fetch pubsubTopics subscribed to relay and convert them to shards
@ -252,6 +267,7 @@ proc mountAutoSharding*(
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
node.wakuAutoSharding =
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
return ok()
proc getMixNodePoolSize*(node: WakuNode): int =
@ -443,6 +459,20 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
return ok()
proc startProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.setProvider(
proc(
pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic
): Future[Result[RequestRelayShard, string]] {.async.} =
let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr:
return err($error)
return ok(RequestRelayShard(relayShard: shard))
).isOkOr:
error "Can't set proveder for RequestRelayShard", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.clearProvider()
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
@ -491,6 +521,8 @@ proc start*(node: WakuNode) {.async.} =
## The switch will update addresses after start using the addressMapper
await node.switch.start()
node.startProvidersAndListeners()
node.started = true
if not zeroPortPresent:
@ -503,6 +535,9 @@ proc start*(node: WakuNode) {.async.} =
proc stop*(node: WakuNode) {.async.} =
## By stopping the switch we are stopping all the underlying mounted protocols
node.stopProvidersAndListeners()
await node.switch.stop()
node.peerManager.stop()

View File

@ -0,0 +1,23 @@
import waku/common/broker/[request_broker, multi_request_broker]
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health]
import waku/waku_core/topics
export protocol_health, topic_health
RequestBroker:
type RequestNodeHealth* = object
healthStatus*: NodeHealth
RequestBroker:
type RequestRelayTopicsHealth* = object
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
proc signature(
topics: seq[PubsubTopic]
): Future[Result[RequestRelayTopicsHealth, string]] {.async.}
MultiRequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth

View File

@ -0,0 +1,11 @@
import std/options
import waku/common/broker/[request_broker, multi_request_broker]
import waku/waku_core/[topics]
RequestBroker:
type RequestRelayShard* = object
relayShard*: RelayShard
proc signature(
pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic
): Future[Result[RequestRelayShard, string]] {.async.}

View File

@ -0,0 +1,3 @@
import ./[health_request, rln_requests, node_requests]
export health_request, rln_requests, node_requests

View File

@ -0,0 +1,9 @@
import waku/common/broker/request_broker, waku/waku_core/message/message
RequestBroker:
type RequestGenerateRlnProof* = object
proof*: seq[byte]
proc signature(
message: WakuMessage, senderEpoch: float64
): Future[Result[RequestGenerateRlnProof, string]] {.async.}

View File

@ -0,0 +1,20 @@
type
EventEmitter* = object
# Placeholder for future event emitter implementation
observers*: seq[proc (data: EventData): void]
proc initEventEmitter*(): EventEmitter =
EventEmitter(observers: @[])
proc emitEvent*(emitter: var EventEmitter, data: EventData) =
for observer in emitter.observers:
asyncSpawn observer(data)
proc subscribeToEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) =
emitter.observers.add(observer)
proc unsubscribeFromEvent*(emitter: var EventEmitter, observer: proc (data: EventData): void) =
emitter.observers = emitter.observers.filterIt(it != observer)

View File

@ -19,6 +19,11 @@ func shortLog*(hash: WakuMessageHash): string =
func `$`*(hash: WakuMessageHash): string =
shortLog(hash)
func toString*(hash: WakuMessageHash): string =
var hexhash = newStringOfCap(64)
hexhash &= hash.toOpenArray(hash.low, hash.high).to0xHex()
hexhash
const EmptyWakuMessageHash*: WakuMessageHash = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,

View File

@ -10,9 +10,7 @@ import
bearssl/rand,
stew/byteutils
import
../node/peer_manager,
../node/delivery_monitor/subscriptions_observer,
../waku_core,
waku/[node/peer_manager, waku_core, events/delivery_events],
./common,
./protocol_metrics,
./rpc_codec,
@ -25,16 +23,12 @@ type WakuFilterClient* = ref object of LPProtocol
rng: ref HmacDrbgContext
peerManager: PeerManager
pushHandlers: seq[FilterPushHandler]
subscrObservers: seq[SubscriptionObserver]
func generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
proc addSubscrObserver*(wfc: WakuFilterClient, obs: SubscriptionObserver) =
wfc.subscrObservers.add(obs)
proc sendSubscribeRequest(
wfc: WakuFilterClient,
servicePeer: RemotePeerInfo,
@ -132,8 +126,7 @@ proc subscribe*(
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
for obs in wfc.subscrObservers:
obs.onSubscribe(pubSubTopic, contentTopicSeq)
OnFilterSubscribeEvent.emit(pubSubTopic, contentTopicSeq)
return ok()
@ -156,8 +149,7 @@ proc unsubscribe*(
?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
for obs in wfc.subscrObservers:
obs.onUnsubscribe(pubSubTopic, contentTopicSeq)
OnFilterUnSubscribeEvent.emit(pubSubTopic, contentTopicSeq)
return ok()

View File

@ -31,7 +31,7 @@ proc checkAndGenerateRLNProof*(
proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
return lightpushResultInternalError("no waku relay found")
@ -39,7 +39,7 @@ proc getRelayPushHandler*(
wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message).valueOr:

View File

@ -5,7 +5,6 @@ import libp2p/peerid, libp2p/stream/connection
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
../waku_core,
./common,
@ -19,16 +18,12 @@ logScope:
type WakuLightPushClient* = ref object
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
): T =
WakuLightPushClient(peerManager: peerManager, rng: rng)
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc ensureTimestampSet(message: var WakuMessage) =
if message.timestamp == 0:
message.timestamp = getNowInNanosecondTime()
@ -40,36 +35,43 @@ func shortPeerId(peer: PeerId): string =
func shortPeerId(peer: RemotePeerInfo): string =
shortLog(peer.peerId)
proc sendPushRequestToConn(
wl: WakuLightPushClient, request: LightPushRequest, conn: Connection
proc sendPushRequest(
wl: WakuLightPushClient,
req: LightPushRequest,
peer: PeerId | RemotePeerInfo,
conn: Option[Connection] = none(Connection),
): Future[WakuLightPushResult] {.async.} =
try:
await conn.writeLp(request.encode().buffer)
except LPStreamRemoteClosedError:
error "Failed to write request to peer", error = getCurrentExceptionMsg()
return lightpushResultInternalError(
"Failed to write request to peer: " & getCurrentExceptionMsg()
)
let connection = conn.valueOr:
(await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY,
dialFailure & ": " & $peer & " is not accessible",
)
defer:
await connection.closeWithEOF()
await connection.writeLP(req.encode().buffer)
var buffer: seq[byte]
try:
buffer = await conn.readLp(DefaultMaxRpcSize.int)
buffer = await connection.readLp(DefaultMaxRpcSize.int)
except LPStreamRemoteClosedError:
error "Failed to read response from peer", error = getCurrentExceptionMsg()
return lightpushResultInternalError(
"Failed to read response from peer: " & getCurrentExceptionMsg()
)
let response = LightpushResponse.decode(buffer).valueOr:
error "failed to decode response", error = $error
error "failed to decode response"
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
return lightpushResultInternalError(decodeRpcFailure)
let requestIdMismatch = response.requestId != request.requestId
let tooManyRequests = response.statusCode == LightPushErrorCode.TOO_MANY_REQUESTS
if requestIdMismatch and (not tooManyRequests):
# response with TOO_MANY_REQUESTS error code has no requestId by design
if response.requestId != req.requestId and
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
error "response failure, requestId mismatch",
requestId = request.requestId, responseRequestId = response.requestId
requestId = req.requestId, responseRequestId = response.requestId
return lightpushResultInternalError("response failure, requestId mismatch")
return toPushResult(response)
@ -80,37 +82,34 @@ proc publish*(
wakuMessage: WakuMessage,
dest: Connection | PeerId | RemotePeerInfo,
): Future[WakuLightPushResult] {.async, gcsafe.} =
let conn =
when dest is Connection:
dest
else:
(await wl.peerManager.dialPeer(dest, WakuLightPushCodec)).valueOr:
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
return lighpushErrorResult(
LightPushErrorCode.NO_PEERS_TO_RELAY,
"Peer is not accessible: " & dialFailure & " - " & $dest,
)
defer:
await conn.closeWithEOF()
var message = wakuMessage
ensureTimestampSet(message)
let msgHash = computeMessageHash(pubSubTopic.get(""), message).to0xHex()
let peerIdStr =
when dest is Connection:
shortPeerId(dest.peerId)
else:
shortPeerId(dest)
info "publish",
myPeerId = wl.peerManager.switch.peerInfo.peerId,
peerId = shortPeerId(conn.peerId),
peerId = peerIdStr,
msgHash = msgHash,
sentTime = getNowInNanosecondTime()
let request = LightpushRequest(
requestId: generateRequestId(wl.rng), pubsubTopic: pubSubTopic, message: message
)
let relayPeerCount = ?await wl.sendPushRequestToConn(request, conn)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic.get(""), message)
let relayPeerCount =
when dest is Connection:
?await wl.sendPushRequest(request, dest.peerId, some(dest))
elif dest is RemotePeerInfo:
?await wl.sendPushRequest(request, dest)
else:
?await wl.sendPushRequest(request, dest)
return lightpushSuccessResult(relayPeerCount)
@ -124,3 +123,12 @@ proc publishToAny*(
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
)
return await wl.publish(some(pubsubTopic), wakuMessage, peer)
proc publishWithConn*(
wl: WakuLightPushClient,
pubSubTopic: PubsubTopic,
message: WakuMessage,
conn: Connection,
destPeer: PeerId,
): Future[WakuLightPushResult] {.async, gcsafe.} =
return await wl.publish(some(pubSubTopic), message, conn)

View File

@ -25,7 +25,7 @@ type ErrorStatus* = tuple[code: LightpushStatusCode, desc: Option[string]]
type WakuLightPushResult* = Result[uint32, ErrorStatus]
type PushMessageHandler* = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult] {.async.}
const TooManyRequestsMessage* = "Request rejected due to too many requests"
@ -39,7 +39,7 @@ func toPushResult*(response: LightPushResponse): WakuLightPushResult =
return (
if (relayPeerCount == 0):
# Consider publishing to zero peers an error even if the service node
# sent us a "successful" response with zero peers
# sent us a "successful" response with zero peers
err((LightPushErrorCode.NO_PEERS_TO_RELAY, response.statusDesc))
else:
ok(relayPeerCount)

View File

@ -71,7 +71,7 @@ proc handleRequest(
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let res = (await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)).valueOr:
let res = (await wl.pushHandler(pubsubTopic, pushRequest.message)).valueOr:
return err((code: error.code, desc: error.desc))
return ok(res)

View File

@ -30,7 +30,7 @@ proc checkAndGenerateRLNProof*(
proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
@ -38,7 +38,7 @@ proc getRelayPushHandler*(
wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let msgWithProof = ?checkAndGenerateRLNProof(rlnPeer, message)

View File

@ -5,7 +5,6 @@ import libp2p/peerid
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
../waku_core,
./common,
@ -19,7 +18,6 @@ logScope:
type WakuLegacyLightPushClient* = ref object
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
publishObservers: seq[PublishObserver]
proc new*(
T: type WakuLegacyLightPushClient,
@ -28,9 +26,6 @@ proc new*(
): T =
WakuLegacyLightPushClient(peerManager: peerManager, rng: rng)
proc addPublishObserver*(wl: WakuLegacyLightPushClient, obs: PublishObserver) =
wl.publishObservers.add(obs)
proc sendPushRequest(
wl: WakuLegacyLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
@ -86,9 +81,6 @@ proc publish*(
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
@ -111,7 +103,4 @@ proc publishToAny*(
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)
for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok()

View File

@ -9,7 +9,7 @@ export WakuLegacyLightPushCodec
type WakuLightPushResult*[T] = Result[T, string]
type PushMessageHandler* = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}
const TooManyRequestsMessage* = "TOO_MANY_REQUESTS"

View File

@ -53,7 +53,7 @@ proc handleRequest*(
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
let handleRes = await wl.pushHandler(pubsubTopic, message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)

View File

@ -1,3 +1,4 @@
import ./waku_relay/[protocol, topic_health]
import ./waku_relay/protocol
import waku/node/health_monitor/topic_health
export protocol, topic_health

View File

@ -17,8 +17,12 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer
waku/waku_core,
waku/node/health_monitor/topic_health,
waku/requests/health_request,
./message_id
from ../waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
@ -157,7 +161,6 @@ type
# map topic with its assigned validator within pubsub
topicHandlers: Table[PubsubTopic, TopicHandler]
# map topic with the TopicHandler proc in charge of attending topic's incoming message events
publishObservers: seq[PublishObserver]
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]
@ -321,6 +324,19 @@ proc initRelayObservers(w: WakuRelay) =
w.addObserver(administrativeObserver)
proc initRequestProviders(w: WakuRelay) =
RequestRelayTopicsHealth.setProvider(
proc(
topics: seq[PubsubTopic]
): Future[Result[RequestRelayTopicsHealth, string]] {.async.} =
var collectedRes: RequestRelayTopicsHealth
for topic in topics:
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
collectedRes.topicHealth.add((topic, health))
return ok(collectedRes)
).isOkOr:
error "Cannot set Relay Topics Health request provider", error = error
proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
@ -340,9 +356,10 @@ proc new*(
)
procCall GossipSub(w).initPubSub()
w.topicsHealth = initTable[string, TopicHealth]()
w.initProtocolHandler()
w.initRelayObservers()
w.topicsHealth = initTable[string, TopicHealth]()
w.initRequestProviders()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
@ -353,12 +370,6 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))
proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) =
## Observer when the api client performed a publish operation. This
## is initially aimed for bringing an additional layer of delivery reliability thanks
## to store
w.publishObservers.add(obs)
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
## Observes when a message is sent/received from the GossipSub PoV
procCall GossipSub(w).addObserver(observer)
@ -628,9 +639,6 @@ proc publish*(
if relayedPeerCount <= 0:
return err(NoPeersToPublish)
for obs in w.publishObservers:
obs.onMessagePublished(pubSubTopic, message)
return ok(relayedPeerCount)
proc getConnectedPubSubPeers*(

View File

@ -24,10 +24,13 @@ import
./nonce_manager
import
../common/error_handling,
../waku_relay, # for WakuRelayHandler
../waku_core,
../waku_keystore
waku/[
common/error_handling,
waku_relay, # for WakuRelayHandler
waku_core,
requests/rln_requests,
waku_keystore,
]
logScope:
topics = "waku rln_relay"
@ -91,6 +94,7 @@ proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
# stop the group sync, and flush data to tree db
info "stopping rln-relay"
RequestGenerateRlnProof.clearProvider()
await rlnPeer.groupManager.stop()
proc hasDuplicate*(
@ -275,11 +279,11 @@ proc validateMessageAndUpdateLog*(
return isValidMessage
proc appendRLNProof*(
rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64
): RlnRelayResult[void] =
## returns true if it can create and append a `RateLimitProof` to the supplied `msg`
## returns false otherwise
proc createRlnProof(
rlnPeer: WakuRLNRelay, msg: WakuMessage, senderEpochTime: float64
): RlnRelayResult[seq[byte]] =
## returns a new `RateLimitProof` for the supplied `msg`
## returns an error if it cannot create the proof
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
@ -291,7 +295,14 @@ proc appendRLNProof*(
let proof = rlnPeer.groupManager.generateProof(input, epoch, nonce).valueOr:
return err("could not generate rln-v2 proof: " & $error)
msg.proof = proof.encode().buffer
return ok(proof.encode().buffer)
proc appendRLNProof*(
rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64
): RlnRelayResult[void] =
msg.proof = rlnPeer.createRlnProof(msg, senderEpochTime).valueOr:
return err($error)
return ok()
proc clearNullifierLog*(rlnPeer: WakuRlnRelay) =
@ -438,6 +449,18 @@ proc mount(
# Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
RequestGenerateRlnProof.setProvider(
proc(
msg: WakuMessage, senderEpochTime: float64
): Future[Result[RequestGenerateRlnProof, string]] {.async.} =
let proof = createRlnProof(wakuRlnRelay, msg, senderEpochTime).valueOr:
return err("Could not create RLN proof: " & $error)
return ok(RequestGenerateRlnProof(proof: proof))
).isOkOr:
return err("Proof generator provider cannot be set")
return ok(wakuRlnRelay)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =