Various fixes upon test failures. Added initial of subscribe API and auto-subscribe for send api

This commit is contained in:
NagyZoltanPeter 2026-01-23 16:58:12 +01:00
parent 7fa1515e01
commit 1c08ba5ea1
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
16 changed files with 360 additions and 115 deletions

View File

@ -1,23 +1,12 @@
{.used.}
import std/strutils
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
import ../testlib/[common, wakucore, wakunode, testasync]
import ../waku_archive/archive_utils
import
std/[options, sequtils, strutils],
chronos,
testutils/unittests,
stew/byteutils,
libp2p/[switch, peerinfo]
import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
import
waku,
waku/[
waku_node,
waku_core,
waku_relay/protocol,
waku_filter_v2/common,
waku_store/common,
common/broker/broker_context,
]
import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config
waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context]
import waku/api/api_conf, waku/factory/waku_conf
suite "Waku API - Send":
var
@ -38,43 +27,52 @@ suite "Waku API - Send":
storeNodePeerId {.threadvar.}: PeerId
asyncSetup:
# handlerFuture = newPushHandlerFuture()
# handler = proc(
# peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
# ): Future[WakuLightPushResult[void]] {.async.} =
# handlerFuture.complete((pubsubTopic, message))
# return ok()
lockNewGlobalBrokerContext:
relayNode1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await relayNode1.start()
relayNode1.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await relayNode1.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
await relayNode1.mountLibp2pPing()
await relayNode1.start()
lockNewGlobalBrokerContext:
relayNode2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await relayNode2.start()
relayNode2.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await relayNode2.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
await relayNode2.mountLibp2pPing()
await relayNode2.start()
lockNewGlobalBrokerContext:
lightpushNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await lightpushNode.start()
lightpushNode.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await lightpushNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
(await lightpushNode.mountLightPush()).isOkOr:
raiseAssert "Failed to mount lightpush"
await lightpushNode.mountLibp2pPing()
await lightpushNode.start()
lockNewGlobalBrokerContext:
storeNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await storeNode.start()
storeNode.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await storeNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
# Mount archive so store can persist messages
let archiveDriver = newSqliteArchiveDriver()
storeNode.mountArchive(archiveDriver).isOkOr:
raiseAssert "Failed to mount archive: " & error
await storeNode.mountStore()
await storeNode.mountLibp2pPing()
await storeNode.start()
relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo()
relayNode1PeerId = relayNode1.peerInfo.peerId
@ -88,6 +86,27 @@ suite "Waku API - Send":
storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo()
storeNodePeerId = storeNode.peerInfo.peerId
# Subscribe all relay nodes to the default shard topic
const testPubsubTopic = PubsubTopic("/waku/2/rs/1/0")
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
discard
relayNode1.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr:
raiseAssert "Failed to subscribe relayNode1: " & error
relayNode2.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr:
raiseAssert "Failed to subscribe relayNode2: " & error
lightpushNode.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr:
raiseAssert "Failed to subscribe lightpushNode: " & error
storeNode.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr:
raiseAssert "Failed to subscribe storeNode: " & error
# Subscribe all relay nodes to the default shard topic
await relayNode1.connectToNodes(@[relayNode2PeerInfo, storeNodePeerInfo])
await lightpushNode.connectToNodes(@[relayNode2PeerInfo])
asyncTeardown:
await allFutures(
relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop()
@ -120,27 +139,74 @@ suite "Waku API - Send":
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
# node is not connected !
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let sendResult = await node.send(envelope)
check sendResult.isErr() # Depending on implementation, it might say "not healthy"
check sendResult.error().contains("not healthy")
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Check API availability (healthy node)":
# Create a node config that doesn't start or has no peers
let nodeConfig = NodeConfig.init(
mode = WakuMode.Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
p2pReliability = true,
)
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(nodeConfig)).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(
@[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo]
)
let sentEventFuture = newFuture[void]("sentEvent")
let sentListener = MessageSentEvent.listen(
node.brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
raiseAssert "Should not be called"
echo "SENT EVENT TRIGGERED: requestId=", event.requestId
if not sentEventFuture.finished():
sentEventFuture.complete()
,
).valueOr:
raiseAssert error
let errorEventFuture = newFuture[void]("errorEvent")
let errorListener = MessageErrorEvent.listen(
node.brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
check true
echo "ERROR EVENT TRIGGERED: ", event.error
if not errorEventFuture.finished():
errorEventFuture.fail(
newException(CatchableError, "Error event triggered: " & event.error)
)
,
).valueOr:
raiseAssert error
let propagatedEventFuture = newFuture[void]("propagatedEvent")
let propagatedListener = MessagePropagatedEvent.listen(
node.brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
raiseAssert "Should not be called"
echo "PROPAGATED EVENT TRIGGERED: requestId=", event.requestId
if not propagatedEventFuture.finished():
propagatedEventFuture.complete()
,
).valueOr:
raiseAssert error
@ -155,13 +221,16 @@ suite "Waku API - Send":
let sendResult = await node.send(envelope)
if sendResult.isErr():
echo "Send error: ", sendResult.error
check sendResult.isOk() # Depending on implementation, it might say "not healthy"
check:
sendResult.isErr()
# Depending on implementation, it might say "not healthy"
sendResult.error.contains("not healthy")
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await allFutures(sentEventFuture, propagatedEventFuture, errorEventFuture)
.withTimeout(eventTimeout)
check sentEventFuture.completed()
check propagatedEventFuture.completed()
check not errorEventFuture.failed()
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error

View File

@ -3,6 +3,7 @@ import chronicles, chronos, results
import waku/factory/waku
import waku/[requests/health_request, waku_core, waku_node]
import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_service
import ./[api_conf, types]
logScope:
@ -32,11 +33,23 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
warn "Failed to get Waku node health status: ", error = healthStatus.error
# Let's suppose the node is hesalthy enough, go ahead
else:
if healthStatus.get().healthStatus != NodeHealth.Unhealthy:
if healthStatus.get().healthStatus == NodeHealth.Unhealthy:
return err("Waku node is not healthy, has got no connections.")
return ok()
proc subscribe*(
w: Waku, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.subscribe(contentTopic)
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.unsubscribe(contentTopic)
proc send*(
w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
@ -45,7 +58,13 @@ proc send*(
let requestId = newRequestId(w.rng)
let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr:
return err("Failed to create delivery task: " & error)
return err("API send: Failed to create delivery task: " & error)
info "API send: scheduling delivery task",
requestId = $requestId,
pubsubTopic = deliveryTask.pubsubTopic,
contentTopic = deliveryTask.msg.contentTopic,
msgHash = deliveryTask.msgHash.shortLog()
asyncSpawn w.deliveryService.sendService.send(deliveryTask)

View File

@ -86,6 +86,7 @@ type NodeConfig* {.requiresInit.} = object
protocolsConfig: ProtocolsConfig
networkingConfig: NetworkingConfig
ethRpcEndpoints: seq[string]
p2pReliability: bool
proc init*(
T: typedesc[NodeConfig],
@ -93,12 +94,14 @@ proc init*(
protocolsConfig: ProtocolsConfig = TheWakuNetworkPreset,
networkingConfig: NetworkingConfig = DefaultNetworkingConfig,
ethRpcEndpoints: seq[string] = @[],
p2pReliability: bool = false,
): T =
return T(
mode: mode,
protocolsConfig: protocolsConfig,
networkingConfig: networkingConfig,
ethRpcEndpoints: ethRpcEndpoints,
p2pReliability: p2pReliability,
)
proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] =
@ -202,6 +205,7 @@ proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] =
## Various configurations
b.withNatStrategy("any")
b.withP2PReliability(nodeConfig.p2pReliability)
let wakuConf = b.build().valueOr:
return err("Failed to build configuration: " & error)

View File

@ -1,5 +1,6 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/waku_core/message
export types
@ -21,3 +22,9 @@ EventBroker:
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage

View File

@ -428,15 +428,45 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
return err("Health report not available")
try:
let healthReport = healthReportFut.read()
# Convert HealthStatus to NodeHealth
# Check if Relay or Lightpush Client is ready (MinimallyHealthy condition)
var relayReady = false
var lightpushClientReady = false
var storeClientReady = false
var filterClientReady = false
for protocolHealth in healthReport.protocolsHealth:
if protocolHealth.protocol == "Relay" and
protocolHealth.health == HealthStatus.READY:
relayReady = true
elif protocolHealth.protocol == "Lightpush Client" and
protocolHealth.health == HealthStatus.READY:
lightpushClientReady = true
elif protocolHealth.protocol == "Store Client" and
protocolHealth.health == HealthStatus.READY:
storeClientReady = true
elif protocolHealth.protocol == "Filter Client" and
protocolHealth.health == HealthStatus.READY:
filterClientReady = true
# Determine node health based on protocol states
let isMinimallyHealthy = relayReady or lightpushClientReady
let nodeHealth =
case healthReport.nodeHealth
of HealthStatus.READY:
if isMinimallyHealthy and storeClientReady and filterClientReady:
NodeHealth.Healthy
of HealthStatus.SYNCHRONIZING, HealthStatus.INITIALIZING:
elif isMinimallyHealthy:
NodeHealth.MinimallyHealthy
else:
NodeHealth.Unhealthy
debug "Providing health report",
nodeHealth = $nodeHealth,
relayReady = relayReady,
lightpushClientReady = lightpushClientReady,
storeClientReady = storeClientReady,
filterClientReady = filterClientReady,
details = $(healthReport)
ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError:
err("Failed to read health report: " & getCurrentExceptionMsg()),

View File

@ -5,6 +5,7 @@ import chronos
import
./recv_service,
./send_service,
./subscription_service,
waku/[
waku_core,
waku_node,
@ -17,15 +18,24 @@ import
type DeliveryService* = ref object
sendService*: SendService
recvService: RecvService
subscriptionService*: SubscriptionService
proc new*(
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryService
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
let sendService = ?SendService.new(useP2PReliability, w)
let recvService = RecvService.new(w)
return ok(DeliveryService(sendService: sendService, recvService: recvService))
let subscriptionService = SubscriptionService.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionService)
let recvService = RecvService.new(w, subscriptionService)
return ok(
DeliveryService(
sendService: sendService,
recvService: recvService,
subscriptionService: subscriptionService,
)
)
proc startDeliveryService*(self: DeliveryService) =
self.sendService.startSendService()

View File

@ -4,6 +4,7 @@
import std/[tables, sequtils, options]
import chronos, chronicles, libp2p/utility
import ../[subscription_service]
import
waku/[
waku_core,
@ -41,6 +42,7 @@ type RecvService* = ref object of RootObj
node: WakuNode
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnsubscribeEventListener
subscriptionService: SubscriptionService
recentReceivedMsgs: seq[RecvMessage]
@ -90,7 +92,6 @@ proc msgChecker(self: RecvService) {.async.} =
## Continuously checks if a message has been received
while true:
await sleepAsync(StoreCheckPeriod)
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
@ -156,18 +157,24 @@ proc onUnsubscribe(
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
proc new*(T: type RecvService, node: WakuNode): T =
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
var recvService =
RecvService(node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx)
var recvService = RecvService(
node: node,
startTimeToCheck: now,
brokerCtx: node.brokerCtx,
subscriptionService: s,
topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](),
recentReceivedMsgs: @[],
)
if not node.wakuFilterClient.isNil():
let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages recived through filter
## Captures all the messages received through filter
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)

View File

@ -21,7 +21,7 @@ type DeliveryTask* = ref object
errorDesc*: string
proc create*(
T: type DeliveryTask,
T: typedesc[DeliveryTask],
requestId: RequestId,
envelop: MessageEnvelope,
brokerCtx: BrokerContext,
@ -31,6 +31,7 @@ proc create*(
let relayShardRes = (
RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic)
).valueOr:
echo "RequestRelayShard.request error", $error
return err($error)
let pubsubTopic = relayShardRes.relayShard.toPubsubTopic()

View File

@ -2,10 +2,9 @@ import chronicles, chronos, results
import std/options
import
waku/waku_node,
waku/waku_core,
waku/node/peer_manager,
waku/waku_lightpush/[callbacks, common, client, rpc],
waku/waku_core,
waku/waku_lightpush/[common, client, rpc],
waku/common/broker/broker_context
import ./[delivery_task, send_processor]
@ -18,7 +17,7 @@ type LightpushSendProcessor* = ref object of BaseSendProcessor
lightpushClient: WakuLightPushClient
proc new*(
T: type LightpushSendProcessor,
T: typedesc[LightpushSendProcessor],
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
brokerCtx: BrokerContext,
@ -41,7 +40,9 @@ method sendImpl*(
): Future[void] {.async.} =
task.tryCount.inc()
info "Trying message delivery via Lightpush",
requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
tryCount = task.tryCount
let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr:
task.state = DeliveryState.NextRoundRetry

View File

@ -1,6 +1,6 @@
import chronos, chronicles
import std/options
import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc]
import waku/[waku_core], waku/waku_lightpush/[common, rpc]
import waku/requests/health_request
import waku/common/broker/broker_context
import waku/api/types
@ -33,36 +33,42 @@ proc new*(
proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
error "isTopicHealthy: failed to get health report", topic = topic, error = error
return false
if healthReport.topicHealth.len() < 1:
warn "isTopicHealthy: no topic health entries", topic = topic
return false
let health = healthReport.topicHealth[0].health
debug "isTopicHealthy: topic health is ", topic = topic, health = health
return health == MINIMALLY_HEALTHY or health == SUFFICIENTLY_HEALTHY
method isValidProcessor*(
self: RelaySendProcessor, task: DeliveryTask
): bool {.gcsafe.} =
return self.isTopicHealthy(task.pubsubTopic)
# Topic health query is not reliable enough after a fresh subscribe...
# return self.isTopicHealthy(task.pubsubTopic)
return true
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
task.tryCount.inc()
info "Trying message delivery via Relay",
requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
tryCount = task.tryCount
let pushResult = await self.publishProc(task.pubsubTopic, task.msg)
if pushResult.isErr():
let errorMessage = pushResult.error.desc.get($pushResult.error.code)
let noOfPublishedPeers = (await self.publishProc(task.pubsubTopic, task.msg)).valueOr:
let errorMessage = error.desc.get($error.code)
error "Failed to publish message with relay",
request = task.requestId, msgHash = task.msgHash, error = errorMessage
if pushResult.error.code != LightPushErrorCode.NO_PEERS_TO_RELAY:
if error.code != LightPushErrorCode.NO_PEERS_TO_RELAY:
task.state = DeliveryState.FailedToDeliver
task.errorDesc = errorMessage
else:
task.state = self.fallbackStateToSet
return
if pushResult.isOk and pushResult.get() > 0:
if noOfPublishedPeers > 0:
info "Message propagated via Relay",
requestId = task.requestId, msgHash = task.msgHash
task.state = DeliveryState.SuccessfullyPropagated

View File

@ -5,18 +5,17 @@ import std/[sequtils, tables, options]
import chronos, chronicles, libp2p/utility
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
../[subscription_service],
waku/[
waku_core,
node/waku_node,
node/peer_manager,
waku_store/client,
waku_store/common,
waku_archive/archive,
waku_relay/protocol,
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
events/delivery_events,
events/message_events,
common/broker/broker_context,
]
@ -59,6 +58,7 @@ type SendService* = ref object of RootObj
node: WakuNode
checkStoreForMessages: bool
subscriptionService: SubscriptionService
proc setupSendProcessorChain(
peerManager: PeerManager,
@ -88,14 +88,14 @@ proc setupSendProcessorChain(
processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx))
var currentProcessor: BaseSendProcessor = processors[0]
for i in 1 ..< processors.len():
for i in 1 ..< processors.len:
currentProcessor.chain(processors[i])
currentProcessor = processors[i]
return ok(processors[0])
proc new*(
T: type SendService, preferP2PReliability: bool, w: WakuNode
T: type SendService, preferP2PReliability: bool, w: WakuNode, s: SubscriptionService
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(
@ -116,6 +116,7 @@ proc new*(
sendProcessor: sendProcessorChain,
node: w,
checkStoreForMessages: checkStoreForMessages,
subscriptionService: s,
)
return ok(sendService)
@ -173,18 +174,21 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
of DeliveryState.SuccessfullyPropagated:
# TODO: in case of of unable to strore check messages shall we report success instead?
info "Message successfully propagated",
requestId = task.requestId, msgHash = task.msgHash
MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString())
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex())
return
of DeliveryState.SuccessfullyValidated:
info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash
MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString())
info "Message successfully sent",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex())
return
of DeliveryState.FailedToDeliver:
error "Failed to send message",
requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
error = task.errorDesc
MessageErrorEvent.emit(
self.brokerCtx, task.requestId, task.msgHash.toString(), task.errorDesc
self.brokerCtx, task.requestId, task.msgHash.to0xHex(), task.errorDesc
)
return
else:
@ -193,19 +197,22 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
if task.messageAge() > MaxTimeInCache:
error "Failed to send message",
requestId = task.requestId, msgHash = task.msgHash, error = "Message too old"
requestId = task.requestId,
msgHash = task.msgHash.to0xHex(),
error = "Message too old",
age = task.messageAge()
task.state = DeliveryState.FailedToDeliver
MessageErrorEvent.emit(
self.brokerCtx,
task.requestId,
task.msgHash.toString(),
task.msgHash.to0xHex(),
"Unable to send within retry time window",
)
proc evaluateAndCleanUp(self: SendService) =
self.taskCache.forEach(self.reportTaskResult(it))
self.taskCache.keepItIf(
it.state != DeliveryState.SuccessfullyValidated or
it.state != DeliveryState.SuccessfullyValidated and
it.state != DeliveryState.FailedToDeliver
)
@ -238,9 +245,16 @@ proc stopSendService*(self: SendService) =
if not self.serviceLoopHandle.isNil():
discard self.serviceLoopHandle.cancelAndWait()
proc send*(self: SendService, task: DeliveryTask): Future[void] {.async.} =
proc send*(self: SendService, task: DeliveryTask) {.async.} =
assert(not task.isNil(), "task for send must not be nil")
info "SendService.send: processing delivery task",
requestId = task.requestId, msgHash = task.msgHash
self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr:
error "SendService.send: failed to subscribe to content topic",
contentTopic = task.msg.contentTopic, error = error
await self.sendProcessor.process(task)
reportTaskResult(self, task)
if task.state != DeliveryState.FailedToDeliver:

View File

@ -0,0 +1,64 @@
import chronos, chronicles
import
waku/[
waku_core,
waku_core/topics,
events/message_events,
waku_node,
common/broker/broker_context,
]
type SubscriptionService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
proc isSubscribed*(
self: SubscriptionService, topic: ContentTopic
): Result[bool, string] =
var isSubscribed = false
if self.node.wakuRelay.isNil() == false:
return self.node.isSubscribed((kind: ContentSub, topic: topic))
# TODO: Add support for edge mode with Filter subscription management
return ok(isSubscribed)
#TODO: later PR may consider to refactor or place this function elsewhere
# The only important part is that it emits MessageReceivedEvent
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "API received message",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
let isSubscribed = self.isSubscribed(topic).valueOr:
error "Failed to check subscription status: ", error = error
return err("Failed to check subscription status: " & error)
if isSubscribed == false:
if self.node.wakuRelay.isNil() == false:
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
error "Failed to subscribe: ", error = error
return err("Failed to subscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if self.node.wakuRelay.isNil() == false:
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
error "Failed to unsubscribe: ", error = error
return err("Failed to unsubscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()

View File

@ -30,6 +30,8 @@ import
../peer_manager,
../../waku_rln_relay
export waku_relay.WakuRelayHandler
declarePublicHistogram waku_histogram_message_size,
"message size histogram in kB",
buckets = [
@ -91,6 +93,23 @@ proc registerRelayHandler(
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
proc getTopicOfSubscriptionEvent(
node: WakuNode, subscription: SubscriptionEvent
): Result[(PubsubTopic, Option[ContentTopic]), string] =
case subscription.kind
of ContentSub, ContentUnsub:
if node.wakuAutoSharding.isSome():
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
return err("Autosharding error: " & error)
return ok(($shard, some(subscription.topic)))
else:
return
err("Static sharding is used, relay subscriptions must specify a pubsub topic")
of PubsubSub, PubsubUnsub:
return ok((subscription.topic, none[ContentTopic]()))
else:
return err("Unsupported subscription type in relay getTopicOfSubscriptionEvent")
proc subscribe*(
node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler
): Result[void, string] =
@ -101,27 +120,15 @@ proc subscribe*(
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentSub:
if node.wakuAutoSharding.isSome():
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
else:
return err(
"Static sharding is used, relay subscriptions must specify a pubsub topic"
)
of PubsubSub:
(subscription.topic, none(ContentTopic))
else:
return err("Unsupported subscription type in relay subscribe")
let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr:
error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error)
if node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
return ok()
info "subscribe", pubsubTopic, contentTopicOp
node.registerRelayHandler(pubsubTopic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
@ -136,22 +143,9 @@ proc unsubscribe*(
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentUnsub:
if node.wakuAutoSharding.isSome():
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
else:
return err(
"Static sharding is used, relay subscriptions must specify a pubsub topic"
)
of PubsubUnsub:
(subscription.topic, none(ContentTopic))
else:
return err("Unsupported subscription type in relay unsubscribe")
let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr:
error "Failed to decode unsubscribe event", error = error
return err("Failed to decode unsubscribe event: " & error)
if not node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
@ -163,6 +157,19 @@ proc unsubscribe*(
return ok()
proc isSubscribed*(
node: WakuNode, subscription: SubscriptionEvent
): Result[bool, string] =
if node.wakuRelay.isNil():
error "Invalid API call to `isSubscribed`. WakuRelay not mounted."
return err("Invalid API call to `isSubscribed`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr:
error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error)
return ok(node.wakuRelay.isSubscribed(pubsubTopic))
proc publish*(
node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage
): Future[Result[int, string]] {.async, gcsafe.} =

View File

@ -145,9 +145,14 @@ proc deduceRelayShard(
let pubsubTopic = pubsubTopicOp.valueOr:
if node.wakuAutoSharding.isNone():
return err("Pubsub topic must be specified when static sharding is enabled.")
node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
let msg = "Deducing shard failed: " & error
return err(msg)
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
let msg = "Deducing shard failed: " & error
return err(msg)
return ok(shard)
let shard = RelayShard.parse(pubsubTopic).valueOr:
return err("Invalid topic:" & pubsubTopic & " " & $error)
return ok(shard)
proc getShardsGetter(node: WakuNode): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =

View File

@ -19,7 +19,7 @@ func shortLog*(hash: WakuMessageHash): string =
func `$`*(hash: WakuMessageHash): string =
shortLog(hash)
func toString*(hash: WakuMessageHash): string =
func to0xHex*(hash: WakuMessageHash): string =
var hexhash = newStringOfCap(64)
hexhash &= hash.toOpenArray(hash.low, hash.high).to0xHex()
hexhash

View File

@ -584,6 +584,7 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
procCall GossipSub(w).subscribe(pubsubTopic, topicHandler)
w.topicHandlers[pubsubTopic] = topicHandler
asyncSpawn w.updateTopicsHealth()
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic