mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe where appropriate, removed leftover
This commit is contained in:
parent
130ebd7509
commit
15f0ab9e52
@ -12,19 +12,25 @@ proc periodicSender(w: Waku): Future[void] {.async.} =
|
|||||||
proc(event: MessageSentEvent) {.async: (raises: []).} =
|
proc(event: MessageSentEvent) {.async: (raises: []).} =
|
||||||
echo "Message sent with request ID: ",
|
echo "Message sent with request ID: ",
|
||||||
event.requestId, " hash: ", event.messageHash
|
event.requestId, " hash: ", event.messageHash
|
||||||
)
|
).valueOr:
|
||||||
|
echo "Failed to listen to message sent event: ", error
|
||||||
|
return
|
||||||
|
|
||||||
let errorListener = MessageErrorEvent.listen(
|
let errorListener = MessageErrorEvent.listen(
|
||||||
proc(event: MessageErrorEvent) {.async: (raises: []).} =
|
proc(event: MessageErrorEvent) {.async: (raises: []).} =
|
||||||
echo "Message failed to send with request ID: ",
|
echo "Message failed to send with request ID: ",
|
||||||
event.requestId, " error: ", event.error
|
event.requestId, " error: ", event.error
|
||||||
)
|
).valueOr:
|
||||||
|
echo "Failed to listen to message error event: ", error
|
||||||
|
return
|
||||||
|
|
||||||
let propagatedListener = MessagePropagatedEvent.listen(
|
let propagatedListener = MessagePropagatedEvent.listen(
|
||||||
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
|
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
|
||||||
echo "Message propagated with request ID: ",
|
echo "Message propagated with request ID: ",
|
||||||
event.requestId, " hash: ", event.messageHash
|
event.requestId, " hash: ", event.messageHash
|
||||||
)
|
).valueOr:
|
||||||
|
echo "Failed to listen to message propagated event: ", error
|
||||||
|
return
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
MessageSentEvent.dropListener(sentListener)
|
MessageSentEvent.dropListener(sentListener)
|
||||||
|
|||||||
@ -3,7 +3,7 @@ import chronicles, chronos, results
|
|||||||
import waku/factory/waku
|
import waku/factory/waku
|
||||||
import waku/[requests/health_request, waku_core, waku_node]
|
import waku/[requests/health_request, waku_core, waku_node]
|
||||||
import waku/node/delivery_service/send_service
|
import waku/node/delivery_service/send_service
|
||||||
import ./[api_conf, types], ./subscribe/subscribe
|
import ./[api_conf, types]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "api"
|
topics = "api"
|
||||||
@ -26,7 +26,7 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
|
|||||||
|
|
||||||
# check if health is satisfactory
|
# check if health is satisfactory
|
||||||
# If Node is not healthy, return err("Waku node is not healthy")
|
# If Node is not healthy, return err("Waku node is not healthy")
|
||||||
let healthStatus = waitFor RequestNodeHealth.request()
|
let healthStatus = RequestNodeHealth.request()
|
||||||
|
|
||||||
if healthStatus.isErr():
|
if healthStatus.isErr():
|
||||||
warn "Failed to get Waku node health status: ", error = healthStatus.error
|
warn "Failed to get Waku node health status: ", error = healthStatus.error
|
||||||
@ -37,17 +37,6 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc subscribe*(
|
|
||||||
w: Waku, contentTopic: ContentTopic
|
|
||||||
): Future[Result[RequestId, string]] {.async.} =
|
|
||||||
?checkApiAvailability(w)
|
|
||||||
|
|
||||||
let requestId = newRequestId(w.rng)
|
|
||||||
|
|
||||||
asyncSpawn w.subscribeImpl(requestId, contentTopic)
|
|
||||||
|
|
||||||
return ok(requestId)
|
|
||||||
|
|
||||||
proc send*(
|
proc send*(
|
||||||
w: Waku, envelope: MessageEnvelope
|
w: Waku, envelope: MessageEnvelope
|
||||||
): Future[Result[RequestId, string]] {.async.} =
|
): Future[Result[RequestId, string]] {.async.} =
|
||||||
|
|||||||
@ -1,12 +0,0 @@
|
|||||||
# import chronicles, chronos, results
|
|
||||||
import chronos
|
|
||||||
import waku/waku_core
|
|
||||||
import waku/api/types
|
|
||||||
import waku/factory/waku
|
|
||||||
|
|
||||||
proc subscribeImpl*(
|
|
||||||
w: Waku, requestId: RequestId, contentTopic: ContentTopic
|
|
||||||
): Future[void] {.async.} =
|
|
||||||
## Implementation of the subscribe API
|
|
||||||
## This is a placeholder implementation
|
|
||||||
await sleepAsync(1000) # Simulate async work
|
|
||||||
@ -30,7 +30,7 @@ proc isLightpushPeerAvailable(
|
|||||||
|
|
||||||
method isValidProcessor*(
|
method isValidProcessor*(
|
||||||
self: LightpushSendProcessor, task: DeliveryTask
|
self: LightpushSendProcessor, task: DeliveryTask
|
||||||
): Future[bool] {.async.} =
|
): bool {.gcsafe.} =
|
||||||
return self.isLightpushPeerAvailable(task.pubsubTopic)
|
return self.isLightpushPeerAvailable(task.pubsubTopic)
|
||||||
|
|
||||||
method sendImpl*(
|
method sendImpl*(
|
||||||
|
|||||||
@ -26,8 +26,8 @@ proc new*(
|
|||||||
return
|
return
|
||||||
RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet)
|
RelaySendProcessor(publishProc: publishProc, fallbackStateToSet: fallbackStateToSet)
|
||||||
|
|
||||||
proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} =
|
proc isTopicHealthy(topic: PubsubTopic): bool {.gcsafe.} =
|
||||||
let healthReport = (await RequestRelayTopicsHealth.request(@[topic])).valueOr:
|
let healthReport = RequestRelayTopicsHealth.request(@[topic]).valueOr:
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if healthReport.topicHealth.len() < 1:
|
if healthReport.topicHealth.len() < 1:
|
||||||
@ -37,8 +37,8 @@ proc isTopicHealthy(topic: PubsubTopic): Future[bool] {.async.} =
|
|||||||
|
|
||||||
method isValidProcessor*(
|
method isValidProcessor*(
|
||||||
self: RelaySendProcessor, task: DeliveryTask
|
self: RelaySendProcessor, task: DeliveryTask
|
||||||
): Future[bool] {.async.} =
|
): bool {.gcsafe.} =
|
||||||
return await isTopicHealthy(task.pubsubTopic)
|
return isTopicHealthy(task.pubsubTopic)
|
||||||
|
|
||||||
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
|
method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} =
|
||||||
task.tryCount.inc()
|
task.tryCount.inc()
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
import chronos
|
import chronos
|
||||||
import ./delivery_task
|
import ./delivery_task
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
type BaseSendProcessor* = ref object of RootObj
|
type BaseSendProcessor* = ref object of RootObj
|
||||||
fallbackProcessor*: BaseSendProcessor
|
fallbackProcessor*: BaseSendProcessor
|
||||||
|
|
||||||
@ -9,7 +11,7 @@ proc chain*(self: BaseSendProcessor, next: BaseSendProcessor) =
|
|||||||
|
|
||||||
method isValidProcessor*(
|
method isValidProcessor*(
|
||||||
self: BaseSendProcessor, task: DeliveryTask
|
self: BaseSendProcessor, task: DeliveryTask
|
||||||
): Future[bool] {.async, base.} =
|
): bool {.base, gcsafe.} =
|
||||||
return false
|
return false
|
||||||
|
|
||||||
method sendImpl*(
|
method sendImpl*(
|
||||||
@ -23,7 +25,7 @@ method process*(
|
|||||||
var currentProcessor: BaseSendProcessor = self
|
var currentProcessor: BaseSendProcessor = self
|
||||||
var keepTrying = true
|
var keepTrying = true
|
||||||
while not currentProcessor.isNil() and keepTrying:
|
while not currentProcessor.isNil() and keepTrying:
|
||||||
if await currentProcessor.isValidProcessor(task):
|
if currentProcessor.isValidProcessor(task):
|
||||||
await currentProcessor.sendImpl(task)
|
await currentProcessor.sendImpl(task)
|
||||||
currentProcessor = currentProcessor.fallbackProcessor
|
currentProcessor = currentProcessor.fallbackProcessor
|
||||||
keepTrying = task.state == DeliveryState.FallbackRetry
|
keepTrying = task.state == DeliveryState.FallbackRetry
|
||||||
|
|||||||
@ -6,17 +6,15 @@ import waku/waku_core/topics
|
|||||||
|
|
||||||
export protocol_health, topic_health
|
export protocol_health, topic_health
|
||||||
|
|
||||||
RequestBroker:
|
RequestBroker(sync):
|
||||||
type RequestNodeHealth* = object
|
type RequestNodeHealth* = object
|
||||||
healthStatus*: NodeHealth
|
healthStatus*: NodeHealth
|
||||||
|
|
||||||
RequestBroker:
|
RequestBroker(sync):
|
||||||
type RequestRelayTopicsHealth* = object
|
type RequestRelayTopicsHealth* = object
|
||||||
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
|
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
|
||||||
|
|
||||||
proc signature(
|
proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string]
|
||||||
topics: seq[PubsubTopic]
|
|
||||||
): Future[Result[RequestRelayTopicsHealth, string]] {.async.}
|
|
||||||
|
|
||||||
MultiRequestBroker:
|
MultiRequestBroker:
|
||||||
type RequestProtocolHealth* = object
|
type RequestProtocolHealth* = object
|
||||||
|
|||||||
@ -326,9 +326,7 @@ proc initRelayObservers(w: WakuRelay) =
|
|||||||
|
|
||||||
proc initRequestProviders(w: WakuRelay) =
|
proc initRequestProviders(w: WakuRelay) =
|
||||||
RequestRelayTopicsHealth.setProvider(
|
RequestRelayTopicsHealth.setProvider(
|
||||||
proc(
|
proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] =
|
||||||
topics: seq[PubsubTopic]
|
|
||||||
): Future[Result[RequestRelayTopicsHealth, string]] {.async.} =
|
|
||||||
var collectedRes: RequestRelayTopicsHealth
|
var collectedRes: RequestRelayTopicsHealth
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
|
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user